diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client')
9 files changed, 102 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index dbd742070e..ee3e0767d4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1452,16 +1452,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Not a hard-error connection not closing: " + cause); } - - // deliver the exception if there is a listener - if (_exceptionListener != null) - { - _exceptionListener.onException(je); - } - else - { - _logger.error("Throwable Received but no listener set: " + cause); - } // if we are closing the connection, close sessions first if (closer) @@ -1475,6 +1465,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.error("Error closing all sessions: " + e, e); } } + + // deliver the exception if there is a listener + if (_exceptionListener != null) + { + _exceptionListener.onException(je); + } + else + { + _logger.error("Throwable Received but no listener set: " + cause); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 24e5253cc8..75f71a99c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic preAcquire = !consumer.isNoConsume() && (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); + + arguments.putAll( + (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); } - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 905bf5e111..4bac54b3e4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes /** * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private final boolean _exclusive; + protected boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per @@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - + _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; _noConsume = noConsume; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index d0f1f79631..699b52a6b1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -490,4 +490,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } + + public boolean isExclusive() + { + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index cae11e3962..32c7ef29de 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -223,8 +223,10 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate String exchange = replyTo.getExchange(); String routingKey = replyTo.getRoutingKey(); - dest = generateDestination(exchange == null ? null : new AMQShortString(exchange), - routingKey == null ? null : new AMQShortString(routingKey)); + dest = generateDestination(exchange == null ? new AMQShortString("") : + new AMQShortString(exchange), + routingKey == null ? new AMQShortString(""): + new AMQShortString(routingKey)); _destinationCache.put(replyTo, new SoftReference<Destination>(dest)); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 64d5b16db0..00503cc650 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Subscription; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; @@ -264,6 +265,7 @@ public class AddressHelper public Link getLink() { Link link = new Link(); + link.setSubscription(new Subscription()); if (linkProps != null) { link.setDurable(linkProps.getBoolean(DURABLE) == null ? false @@ -283,7 +285,8 @@ public class AddressHelper .setProducerCapacity(capacityProps .getInt(CAPACITY_TARGET) == null ? 0 : capacityProps.getInt(CAPACITY_TARGET)); - } else + } + else { int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps .getInt(CAPACITY); @@ -292,6 +295,21 @@ public class AddressHelper } link.setFilter(linkProps.getString(FILTER)); // so far filter type not used + + if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + { + Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; + + link.getSubscription().setExclusive(exclusive); + } } return link; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 0ebcaf548b..a7d19d1bd5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.HashMap; +import java.util.Map; + import org.apache.qpid.client.messaging.address.Node.QueueNode; public class Link @@ -34,6 +37,7 @@ public class Link protected int _consumerCapacity = 0; protected int _producerCapacity = 0; protected Node node; + protected Subscription subscription; public Node getNode() { @@ -114,4 +118,40 @@ public class Link { this.name = name; } + + public Subscription getSubscription() + { + return this.subscription; + } + + public void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + public static class Subscription + { + private Map<String,Object> args = new HashMap<String,Object>(); + private boolean exclusive = false; + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } + + public boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(boolean exclusive) + { + this.exclusive = exclusive; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index edfb4bb16b..10250a1ac0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -66,7 +66,6 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.network.Transport; import org.slf4j.Logger; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 4236f20301..44376331ee 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -112,8 +112,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); _connection = connection; } |