diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid')
14 files changed, 1044 insertions, 351 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b64d355f80..2a91ff3ce2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); + private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private final long _connectionNumber; /** @@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); - private static final long DEFAULT_TIMEOUT = 1000 * 30; private AMQConnectionDelegate _delegate; @@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - close(DEFAULT_TIMEOUT); + close(DEFAULT_CLOSE_TIMEOUT); } public void close(long timeout) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 5242629a91..9650ad76fb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -67,6 +68,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; private boolean _messageCompressionSupported; + private boolean _addrSyntaxSupported; public void closeConnection(long timeout) throws JMSException, AMQException { @@ -76,6 +78,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public AMQConnectionDelegate_8_0(AMQConnection conn) { _conn = conn; + _addrSyntaxSupported = + Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8, + String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT))); } protected boolean checkException(Throwable thrown) @@ -429,4 +434,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _messageCompressionSupported; } + + public boolean isAddrSyntaxSupported() + { + return _addrSyntaxSupported; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index e06fc0f1de..2714caf2a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,6 +20,20 @@ */ package org.apache.qpid.client; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Destination; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,20 +48,6 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import javax.jms.Destination; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - public abstract class AMQDestination implements Destination, Referenceable, Externalizable { @@ -813,7 +813,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte _address = addr; } - public int getAddressType(){ + public int getAddressType() + { return _addressType; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c2659194e2..0183c30276 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -68,9 +68,11 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ListMessage; @@ -79,6 +81,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; /* * TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For @@ -310,6 +313,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _immediatePrefetch; } + abstract void handleNodeDelete(final AMQDestination dest) throws AMQException; + + abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -600,6 +607,128 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + public void setLegacyFieldsForQueueType(AMQDestination dest) + { + // legacy support + dest.setQueueName(new AMQShortString(dest.getAddressName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + + public void setLegacyFieldsForTopicType(AMQDestination dest) + { + // legacy support + dest.setExchangeName(new AMQShortString(dest.getAddressName())); + Node node = dest.getNode(); + dest.setExchangeClass(node.getExchangeType() == null? + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): + new AMQShortString(node.getExchangeType())); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); + } + + protected void verifySubject(AMQDestination dest) throws AMQException + { + if (dest.getSubject() == null || dest.getSubject().trim().equals("")) + { + + if ("topic".equals(dest.getExchangeClass().toString())) + { + dest.setRoutingKey(new AMQShortString("#")); + dest.setSubject(dest.getRoutingKey().toString()); + } + else + { + dest.setRoutingKey(new AMQShortString("")); + dest.setSubject(""); + } + } + } + + public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws AMQException; + + public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException; + + /** + * 1. Try to resolve the address type (queue or exchange) + * 2. if type == queue, + * 2.1 verify queue exists or create if create == true + * 2.2 If not throw exception + * + * 3. if type == exchange, + * 3.1 verify exchange exists or create if create == true + * 3.2 if not throw exception + * 3.3 if exchange exists (or created) create subscription queue. + */ + + @SuppressWarnings("deprecation") + public void resolveAddress(AMQDestination dest, + boolean isConsumer, + boolean noLocal) throws AMQException + { + if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) + { + return; + } + else + { + boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) || + (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) || + (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER); + + boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) || + (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) || + (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER); + + + + int type = resolveAddressType(dest); + + switch (type) + { + case AMQDestination.QUEUE_TYPE: + { + if(createNode) + { + setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); + break; + } + else if (isQueueExist(dest,assertNode)) + { + setLegacyFieldsForQueueType(dest); + break; + } + } + + case AMQDestination.TOPIC_TYPE: + { + if(createNode) + { + setLegacyFieldsForTopicType(dest); + verifySubject(dest); + handleExchangeNodeCreation(dest); + break; + } + else if (isExchangeExist(dest,assertNode)) + { + setLegacyFieldsForTopicType(dest); + verifySubject(dest); + break; + } + } + + default: + throw new AMQException( + "The name '" + dest.getAddressName() + + "' supplied in the address doesn't resolve to an exchange or a queue"); + } + dest.setAddressResolved(System.currentTimeMillis()); + } + } + + public abstract int resolveAddressType(AMQDestination dest) throws AMQException; + protected abstract void acknowledgeImpl() throws JMSException; /** @@ -2594,6 +2723,54 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + + void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws AMQException + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doBind(dest, binding, queue, exchange); + } + } + + protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException; + + abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException; + + abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange) + throws AMQException; + public abstract void sendConsume(C consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException; @@ -2696,7 +2873,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @throws AMQException If the exchange cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - private void declareExchange(final AMQShortString name, final AMQShortString type, + void declareExchange(final AMQShortString name, final AMQShortString type, final boolean nowait, final boolean durable, final boolean autoDelete, final boolean internal) throws AMQException { @@ -2710,9 +2887,53 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic }, _connection).execute(); } + void declareExchange(final AMQShortString name, final AMQShortString type, + final boolean nowait, final boolean durable, + final boolean autoDelete, final FieldTable arguments, + final boolean passive) throws AMQException + { + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive); + return null; + } + }, _connection).execute(); + } + + protected AMQShortString preprocessAddressTopic(final C consumer, + AMQShortString queueName) throws AMQException + { + if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); + + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } + return queueName; + } + + abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException; + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; + + public abstract void sendExchangeDeclare(final AMQShortString name, + final AMQShortString type, + final boolean nowait, + boolean durable, + boolean autoDelete, + FieldTable arguments, + final boolean passive) throws AMQException, FailoverException; + /** * Declares a queue for a JMS destination. * <p> @@ -2930,10 +3151,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) throws AMQException; - public abstract void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException; - private void registerProducer(long producerId, MessageProducer producer) { _producers.put(producerId, producer); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 19720ea386..68b7cf1f88 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; @@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; @@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic " exchange: " + exchange + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); + doBind(destination, binding, queue, exchange); } } @@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean nowait, int tag) throws AMQException, FailoverException { - if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) - { - if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) - { - String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); - - createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); - queueName = consumer.getDestination().getAMQQueueName(); - consumer.setQueuename(queueName); - } - handleLinkCreation(consumer.getDestination()); - } + queueName = preprocessAddressTopic(consumer, queueName); boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); } + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + { + sendExchangeDeclare(name.asString(), type.asString(), null, + arguments == null ? null : FieldTableSupport.convertToMap(arguments), + nowait, durable, autoDelete); + } + + public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, final boolean nowait, boolean durable, boolean autoDelete) throws AMQException @@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } + @Override public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; @@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } + @Override public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { Node node = dest.getNode(); @@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - /** - * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, - * 2.1 verify queue exists or create if create == true - * 2.2 If not throw exception - * - * 3. if type == exchange, - * 3.1 verify exchange exists or create if create == true - * 3.2 if not throw exception - * 3.3 if exchange exists (or created) create subscription queue. - */ - - @SuppressWarnings("deprecation") - public void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException - { - if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) - { - return; - } - else - { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || - (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || - (!isConsumer && dest.getAssert() == AddressOption.SENDER); - - boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || - (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || - (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - - int type = resolveAddressType(dest); - - switch (type) - { - case AMQDestination.QUEUE_TYPE: - { - if(createNode) - { - setLegacyFieldsForQueueType(dest); - handleQueueNodeCreation(dest,noLocal); - break; - } - else if (isQueueExist(dest,assertNode)) - { - setLegacyFieldsForQueueType(dest); - break; - } - } - - case AMQDestination.TOPIC_TYPE: - { - if(createNode) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - handleExchangeNodeCreation(dest); - break; - } - else if (isExchangeExist(dest,assertNode)) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - break; - } - } - - default: - throw new AMQException( - "The name '" + dest.getAddressName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); - } - dest.setAddressResolved(System.currentTimeMillis()); - } - } - + @Override public int resolveAddressType(AMQDestination dest) throws AMQException { int type = dest.getAddressType(); @@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void verifySubject(AMQDestination dest) throws AMQException - { - if (dest.getSubject() == null || dest.getSubject().trim().equals("")) - { - - if ("topic".equals(dest.getExchangeClass().toString())) - { - dest.setRoutingKey(new AMQShortString("#")); - dest.setSubject(dest.getRoutingKey().toString()); - } - else - { - dest.setRoutingKey(new AMQShortString("")); - dest.setSubject(""); - } - } - } - + @Override void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException { Link link = dest.getLink(); @@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic bindingArguments); } - public void setLegacyFieldsForQueueType(AMQDestination dest) - { - // legacy support - dest.setQueueName(new AMQShortString(dest.getAddressName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); - } - - public void setLegacyFiledsForTopicType(AMQDestination dest) - { - // legacy support - dest.setExchangeName(new AMQShortString(dest.getAddressName())); - Node node = dest.getNode(); - dest.setExchangeClass(node.getExchangeType() == null? - AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): - new AMQShortString(node.getExchangeType())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - } - protected void acknowledgeImpl() { RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); @@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + @Override + protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException { Node node = dest.getNode(); Map<String,Object> arguments = node.getDeclareArgs(); @@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } + @Override void handleExchangeNodeCreation(AMQDestination dest) throws AMQException { Node node = dest.getNode(); @@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } - void handleLinkCreation(AMQDestination dest) throws AMQException - { - createBindings(dest, dest.getLink().getBindings()); - } - - void createBindings(AMQDestination dest, List<Binding> bindings) + protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange) { - String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest - .getAddressName() : "amq.topic"; - - String defaultQueueName = null; - if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) - { - defaultQueueName = dest.getQueueName(); - } - else - { - defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); - } - - for (Binding binding: bindings) - { - String queue = binding.getQueue() == null? - defaultQueueName: binding.getQueue(); - - String exchange = binding.getExchange() == null ? - defaultExchangeForBinding : - binding.getExchange(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + - " with args " + Strings.printMap(binding.getArgs())); - } - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); - } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); } void handleLinkDelete(AMQDestination dest) throws AMQException @@ -1614,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + @Override void handleNodeDelete(AMQDestination dest) throws AMQException { if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index dbbc300910..0145d15111 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -29,7 +29,9 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; @@ -48,10 +50,14 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.Link; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; @@ -59,6 +65,7 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { @@ -170,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination dest, + final AMQShortString exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody - (getTicket(),queueName,exchangeName,routingKey,false,arguments). - generateFrame(getChannelId()), QueueBindOkBody.class); + if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) + { + + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody + (getTicket(), queueName, exchangeName, routingKey, false, arguments). + generateFrame(getChannelId()), QueueBindOkBody.class); + + } + else + { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. + List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>(); + bindings.addAll(destination.getNode().getBindings()); + + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? + destination.getAddressName(): "amq.topic"; + + for (AMQDestination.Binding binding: bindings) + { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchange : + binding.getExchange(); + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + doBind(destination, binding, queue, exchange); + } + } } public void sendClose(long timeout) throws AMQException, FailoverException @@ -230,9 +274,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe table.setObject(entry.getKey(), entry.getValue()); } } - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); - AMQFrame queueDeclare = body.generateFrame(getChannelId()); - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + sendQueueDeclare(name, durable, exclusive, autoDelete, table, false); } public void sendRecover() throws AMQException, FailoverException @@ -428,6 +470,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return (responseBody.getReplyCode() == 0); } + + protected boolean exchangeExists(final AMQShortString exchangeName) + throws AMQException + { + if(!getAMQConnection().getDelegate().supportsIsBound()) + { + return false; + } + + AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, null, null); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + // valid if no issues, or just no bindings + return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3); + } + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) throws AMQException, FailoverException @@ -444,6 +512,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe boolean nowait, int tag) throws AMQException, FailoverException { + queueName = preprocessAddressTopic(consumer, queueName); BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, @@ -468,6 +537,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } @Override + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException + { + final Link link = dest.getLink(); + final String queueName ; + + if (dest.getQueueName() == null) + { + queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); + dest.setQueueName(new AMQShortString(queueName)); + } + else + { + queueName = dest.getQueueName(); + } + + final Link.SubscriptionQueue queueProps = link.getSubscriptionQueue(); + final Map<String,Object> arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + if (link.isDurable() && queueName.startsWith("TempQueue")) + { + throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); + } + + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + // not setting alternate exchange + sendQueueDeclare(AMQShortString.valueOf(queueName), + link.isDurable(), + queueProps.isExclusive(), + queueProps.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + Map<String,Object> bindingArguments = new HashMap<String, Object>(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments); + doBind(dest, binding, queueName, dest.getAddressName()); + + } + + @Override public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { @@ -481,17 +605,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } + @Override + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + + MethodRegistry methodRegistry = getMethodRegistry(); + ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getTicket(), + name, + type, + passive || name.toString().startsWith("amq."), + durable, + autoDelete, + false, + false, + arguments); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); + + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + + public void sendExchangeDelete(final String name) throws AMQException, FailoverException + { + ExchangeDeleteBody body = + getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); + + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { + AMQShortString queueName = amqd.getAMQQueueName(); + boolean durable = amqd.isDurable(); + boolean exclusive = amqd.isExclusive(); + boolean autoDelete = amqd.isAutoDelete(); + FieldTable arguments = null; + sendQueueDeclare(queueName, durable, exclusive, autoDelete, arguments, passive); + } + + private void sendQueueDeclare(final AMQShortString queueName, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, final FieldTable arguments, final boolean passive) + throws AMQException, FailoverException + { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(), - amqd.getAMQQueueName(), + queueName, passive, - amqd.isDurable(), - amqd.isExclusive(), - amqd.isAutoDelete(), + durable, + exclusive, + autoDelete, false, - null); + arguments); AMQFrame queueDeclare = body.generateFrame(getChannelId()); @@ -699,18 +867,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException { - AMQFrame queueDeclare = - getMethodRegistry().createQueueDeclareBody(getTicket(), - amqd.getAMQQueueName(), - true, - amqd.isDurable(), - amqd.isExclusive(), - amqd.isAutoDelete(), - false, - null).generateFrame(getChannelId()); - QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); - return okHandler.getMessageCount(); + if(isBound(null, amqd.getAMQQueueName(), null)) + { + AMQFrame queueDeclare = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + true, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null).generateFrame(getChannelId()); + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + return okHandler.getMessageCount(); + } + else + { + return 0l; + } } protected boolean tagLE(long tag1, long tag2) @@ -733,14 +908,387 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - public void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException + @Override + public void resolveAddress(final AMQDestination dest, final boolean isConsumer, final boolean noLocal) + throws AMQException + { + if(!isAddrSyntaxSupported()) + { + throw new UnsupportedAddressSyntaxException(dest); + } + super.resolveAddress(dest, isConsumer, noLocal); + } + + private boolean isAddrSyntaxSupported() + { + return ((AMQConnectionDelegate_8_0)(getAMQConnection().getDelegate())).isAddrSyntaxSupported(); + } + + public int resolveAddressType(AMQDestination dest) throws AMQException + { + int type = dest.getAddressType(); + String name = dest.getAddressName(); + if (type != AMQDestination.UNKNOWN_TYPE) + { + return type; + } + else + { + boolean isExchange = exchangeExists(AMQShortString.valueOf(name)); + boolean isQueue = isBound(null,AMQShortString.valueOf(name), null); + + if (!isExchange && !isQueue) + { + type = dest instanceof AMQTopic ? AMQDestination.TOPIC_TYPE : AMQDestination.QUEUE_TYPE; + } + else if (!isExchange) + { + //name refers to a queue + type = AMQDestination.QUEUE_TYPE; + } + else if (!isQueue) + { + //name refers to an exchange + type = AMQDestination.TOPIC_TYPE; + } + else + { + //both a queue and exchange exist for that name + throw new AMQException("Ambiguous address, please specify queue or topic as node type"); + } + dest.setAddressType(type); + return type; + } + } + + protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws AMQException + { + final Node node = dest.getNode(); + final Map<String,Object> arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + String altExchange = node.getAlternateExchange(); + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()), + node.isDurable(), + node.isExclusive(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + String altExchange = dest.getNode().getAlternateExchange(); + Map<String, Object> arguments = node.getDeclareArgs(); + + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + + // can't set alt. exchange + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), false); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + + protected void doBind(final AMQDestination dest, + final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException { - throw new UnsupportedAddressSyntaxException(dest); + final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + QueueBindBody queueBindBody = + methodRegistry.createQueueBindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + false, + FieldTable.convertToFieldTable(binding.getArgs())); + + getProtocolHandler().syncWrite(queueBindBody. + generateFrame(getChannelId()), QueueBindOkBody.class); + return null; + } + }, getAMQConnection()).execute(); + } + protected void doUnbind(final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + if (isBound(null, AMQShortString.valueOf(queue), null)) + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + AMQMethodBody body; + if (methodRegistry instanceof MethodRegistry_0_9) + { + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; + body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + null); + } + else if (methodRegistry instanceof MethodRegistry_0_91) + { + MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; + body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(binding.getBindingKey()), + null); + + } + else + { + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } + getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); + return null; + } + else + { + return null; + } + } + }, getAMQConnection()).execute(); + } + + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException + { + Node node = dest.getNode(); + return isQueueExist(dest.getAddressName(), assertNode, + node.isDurable(), node.isAutoDelete(), + node.isExclusive(), node.getDeclareArgs()); + } + + public boolean isQueueExist(final String queueName, boolean assertNode, + final boolean durable, final boolean autoDelete, + final boolean exclusive, final Map<String, Object> args) throws AMQException + { + boolean match = isBound(null,AMQShortString.valueOf(queueName), null); + + if (assertNode) + { + if(!match) + { + throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." ); + + } + else + { + + new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(queueName), + durable, + exclusive, + autoDelete, + FieldTable.convertToFieldTable(args), + true); + + return null; + } + }, getAMQConnection()); + + } + } + + + return match; + } + + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException + { + boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName())); + + Node node = dest.getNode(); + + if (match) + { + if (assertNode) + { + + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(node.getDeclareArgs()), true); + + } + else + { + // TODO - some way to determine the exchange type + /* + _logger.debug("Setting Exchange type " + result.getType()); + node.setExchangeType(result.getType()); + dest.setExchangeClass(new AMQShortString(result.getType())); + */ + + } + } + + if (assertNode) + { + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +". Exchange not found."); + } + } + + return match; + } + + @Override + void handleNodeDelete(final AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDelete(dest.getAddressName()); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + else + { + if (isQueueExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendQueueDelete(AMQShortString.valueOf(dest.getAddressName())); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + } + + @Override + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doUnbind(binding, queue, exchange); + } + } + + + void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest.getQueueName(), false, false, false, false, null)) + { + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDelete(AMQShortString.valueOf(dest.getQueueName())); + return null; + } + }, getAMQConnection())).execute(); + + } + } + protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01e89b78c1..187be8522c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,19 +20,35 @@ */ package org.apache.qpid.client; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ public boolean isExclusive() { - return _exclusive; + + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } } public boolean isReceiving() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 658fb25ce4..8f91a7db08 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,12 +17,18 @@ */ package org.apache.qpid.client; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This is a 0.10 message consumer. */ @@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } - - public boolean isExclusive() - { - AMQDestination dest = this.getDestination(); - if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) - { - return true; - } - else - { - return dest.getLink().getSubscription().isExclusive(); - } - } - else - { - return super.isExclusive(); - } - } + void postSubscription() throws AMQException { @@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).handleNodeDelete(dest); + getSession().handleNodeDelete(dest); } // Subscription queue is handled as part of linkDelete method. - ((AMQSession_0_10) getSession()).handleLinkDelete(dest); + getSession().handleLinkDelete(dest); if (!isDurableSubscriber()) { ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); @@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index f735895c81..cdffc73932 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client; +import javax.jms.JMSException; +import javax.jms.Message; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +41,6 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ConnectionURL; -import javax.jms.JMSException; -import javax.jms.Message; - public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8> { private final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -71,6 +71,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe _topicDestinationCache = session.getTopicDestinationCache(); _queueDestinationCache = session.getQueueDestinationCache(); + + // This is due to the Destination carrying the temporary subscription name which is incorrect. + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) + { + boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; + + if (!namedQueue) + { + setDestination(destination.copyDestination()); + getDestination().setQueueName(null); + } + } + if (destination.getRejectBehaviour() != null) { _rejectBehaviour = destination.getRejectBehaviour(); @@ -105,13 +118,33 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe final AMQFrame cancelFrame = body.generateFrame(getChannelId()); getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); - + postSubscription(); + getSession().sync(); if (_logger.isDebugEnabled()) { _logger.debug("CancelOk'd for consumer:" + debugIdentity()); } } + void postSubscription() throws AMQException + { + AMQDestination dest = this.getDestination(); + if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.RECEIVER ) + { + getSession().handleNodeDelete(dest); + } + // Subscription queue is handled as part of linkDelete method. + getSession().handleLinkDelete(dest); + if (!isDurableSubscriber()) + { + getSession().deleteSubscriptionQueue(dest); + } + } + } + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 33bafe8f20..1d47ce9a07 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.UUID; + import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -32,13 +33,15 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; + +import org.slf4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import org.slf4j.Logger; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -286,6 +289,31 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { setClosed(); _session.deregisterProducer(_producerId); + AMQDestination dest = getAMQDestination(); + AMQSession ssn = getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + try + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.SENDER ) + { + ssn.handleNodeDelete(dest); + } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; + } + } } public void send(Message message) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index eb8104b02c..06a3b08272 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -48,7 +47,6 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.GZIPUtils; import org.apache.qpid.util.Strings; @@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { getSession().resolveAddress(destination,false,false); - ((AMQSession_0_10)getSession()).handleLinkCreation(destination); - ((AMQSession_0_10)getSession()).sync(); + getSession().handleLinkCreation(destination); + getSession().sync(); } catch(Exception e) { @@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public void close() throws JMSException { super.close(); - AMQDestination dest = getAMQDestination(); - AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); - if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - try - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) - { - ssn.handleNodeDelete(dest); - } - ssn.handleLinkDelete(dest); - } - catch(TransportException e) - { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); - } - catch (AMQException e) - { - JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); - ex.setLinkedException(e); - ex.initCause(e); - throw ex; - } - } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 355c456249..e1b399e10a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.CompositeAMQDataBlock; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.util.GZIPUtils; @@ -57,30 +60,37 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory); } - void declareDestination(AMQDestination destination) + void declareDestination(AMQDestination destination) throws AMQException { if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - throw new UnsupportedAddressSyntaxException(destination); - } + getSession().resolveAddress(destination, false, false); - if(getSession().isDeclareExchanges()) + getSession().handleLinkCreation(destination); + getSession().sync(); + } + else { - final MethodRegistry methodRegistry = getSession().getMethodRegistry(); - ExchangeDeclareBody body = - methodRegistry.createExchangeDeclareBody(getSession().getTicket(), - destination.getExchangeName(), - destination.getExchangeClass(), - destination.getExchangeName().toString().startsWith("amq."), - destination.isExchangeDurable(), - destination.isExchangeAutoDelete(), - destination.isExchangeInternal(), - true, - null); - AMQFrame declare = body.generateFrame(getChannelId()); - - getConnection().getProtocolHandler().writeFrame(declare); + if (getSession().isDeclareExchanges()) + { + final MethodRegistry methodRegistry = getSession().getMethodRegistry(); + ExchangeDeclareBody body = + methodRegistry.createExchangeDeclareBody(getSession().getTicket(), + destination.getExchangeName(), + destination.getExchangeClass(), + destination.getExchangeName() + .toString() + .startsWith("amq."), + destination.isExchangeDurable(), + destination.isExchangeAutoDelete(), + destination.isExchangeInternal(), + true, + null); + AMQFrame declare = body.generateFrame(getChannelId()); + + getConnection().getProtocolHandler().writeFrame(declare); + } } } @@ -88,18 +98,43 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { + + + + AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); + BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); + + AMQShortString routingKey = destination.getRoutingKey(); + + FieldTable headers = delegate.getContentHeaderProperties().getHeaders(); + + if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && + (destination.getSubject() != null + || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null))) + { + + if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null) + { + // use default subject in address string + headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject()); + } + + if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + { + routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT)); + } + } + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), - destination.getExchangeName(), - destination.getRoutingKey(), - mandatory, - immediate); + destination.getExchangeName(), + routingKey, + mandatory, + immediate); AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); - AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); - BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); contentHeaderProperties.setUserId(getUserID()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index ad9a37479e..bd089eb6a8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -21,6 +21,23 @@ package org.apache.qpid.client.message; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +45,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; @@ -42,22 +58,6 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.ReplyTo; import org.apache.qpid.transport.TransportException; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - /** * This extends AbstractAMQMessageDelegate which contains common code between * both the 0_8 and 0_10 Message types. @@ -352,14 +352,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { try { - int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); + int type = getAMQSession().resolveAddressType(amqd); if (type == AMQDestination.QUEUE_TYPE) { - ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd); + getAMQSession().setLegacyFieldsForQueueType(amqd); } else { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd); + getAMQSession().setLegacyFieldsForTopicType(amqd); } } catch(AMQException ex) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 21f1623dd1..747668ff9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T> { _waiting.set(true); - while (!_ready) + while (!_ready && _error == null) { try { |