diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 408 |
1 files changed, 290 insertions, 118 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8a7c6b1a01..8490a724bf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,6 +17,11 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.transport.Option.BATCH; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; + import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; @@ -29,8 +34,10 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; + import javax.jms.Destination; import javax.jms.JMSException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -44,18 +51,32 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; +import org.apache.qpid.client.messaging.address.Node; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; -import static org.apache.qpid.transport.Option.BATCH; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; import org.slf4j.Logger; @@ -347,15 +368,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. List<Binding> bindings = new ArrayList<Binding>(); - bindings.addAll(destination.getSourceNode().getBindings()); - bindings.addAll(destination.getTargetNode().getBindings()); + bindings.addAll(destination.getNode().getBindings()); String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; for (Binding binding: bindings) { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } String queue = binding.getQueue() == null? queueName.asString(): binding.getQueue(); @@ -523,11 +551,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, - getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh, - prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); + getMessageFactoryRegistry(), this, rawSelector, prefetchHigh, prefetchLow, + exclusive, getAcknowledgeMode(), noConsume, autoClose); } /** @@ -558,7 +584,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic rk = routingKey.toString(); } - return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null); + return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) @@ -591,10 +617,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * This method is invoked when a consumer is created * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException - { + { + if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); + + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -637,11 +675,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic capacity, Option.UNRELIABLE); } - - if (!nowait) - { - sync(); - } + sync(); } /** @@ -653,7 +687,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, - getProtocolHandler(), producerId, immediate, mandatory); + producerId, immediate, mandatory); } catch (AMQException e) { @@ -673,26 +707,25 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * creates an exchange if it does not already exist */ - public void sendExchangeDeclare(final AMQShortString name, - final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) - throws AMQException, FailoverException + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { - sendExchangeDeclare(name.asString(), type.asString(), null, null, - nowait); + //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it + sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); } public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, - final boolean nowait) throws AMQException + final boolean nowait, boolean durable, boolean autoDelete) throws AMQException { getQpidSession().exchangeDeclare( name, type, alternateExchange, args, - name.toString().startsWith("amq.") ? Option.PASSIVE - : Option.NONE); + name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE, + durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE); // We need to sync so that we get notify of an error. if (!nowait) { @@ -717,18 +750,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Declare a queue with the given queueName */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait, boolean passive) - throws AMQException, FailoverException - { - // do nothing this is only used by 0_8 - } - - /** - * Declare a queue with the given queueName - */ - public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait, boolean passive) + public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, boolean passive) throws AMQException { AMQShortString queueName; @@ -759,7 +782,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - QueueNode node = (QueueNode)amqd.getSourceNode(); + // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java + Node node = amqd.getNode(); Map<String,Object> arguments = new HashMap<String,Object>(); arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs()); if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) @@ -925,12 +949,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return getCurrentException(); } + @Override protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -947,7 +970,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive); + return send0_10QueueDeclare(amqd, noLocal, nowait, passive); } }, getAMQConnection()).execute(); } @@ -1072,11 +1095,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } - public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode) + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); match = !result.getNotFound(); + Node node = dest.getNode(); if (match) { @@ -1086,16 +1110,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (node.getExchangeType() != null && node.getExchangeType().equals(result.getType())) && (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (node.getExchangeType() != null) - { - // even if assert is false, better to verify this - match = node.getExchangeType().equals(result.getType()); - if (!match) - { - _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + - " actual " + result.getType()); - } } else { @@ -1104,18 +1118,27 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setExchangeClass(new AMQShortString(result.getType())); } } - + + if (assertNode) + { + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + } + } + return match; } - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { boolean match = true; try { QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); match = dest.getAddressName().equals(result.getQueue()); - + Node node = dest.getNode(); + if (match && assertNode) { match = (result.getDurable() == node.isDurable()) && @@ -1123,9 +1146,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (result.getExclusive() == node.isExclusive()) && (matchProps(result.getArguments(),node.getDeclareArgs())); } - else if (match) + + if (assertNode) { - // should I use the queried details to update the local data structure. + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + } } } catch(SessionException e) @@ -1140,7 +1167,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic "Error querying queue",e); } } - return match; } @@ -1179,17 +1205,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ @SuppressWarnings("deprecation") - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { - if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) - { - createSubscriptionQueue(dest,noLocal); - } + return; } else { @@ -1209,46 +1231,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { case AMQDestination.QUEUE_TYPE: { - if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) + if(createNode) { - setLegacyFiledsForQueueType(dest); + setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); break; } - else if(createNode) + else if (isQueueExist(dest,assertNode)) { - setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,noLocal,noWait, false); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + setLegacyFieldsForQueueType(dest); break; - } + } } case AMQDestination.TOPIC_TYPE: { - if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) + if(createNode) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest, noLocal); - } + handleExchangeNodeCreation(dest); break; } - else if(createNode) + else if (isExchangeExist(dest,assertNode)) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - sendExchangeDeclare(dest.getAddressName(), - dest.getExchangeClass().asString(), - dest.getTargetNode().getAlternateExchange(), - dest.getTargetNode().getDeclareArgs(), - false); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest,noLocal); - } break; } } @@ -1287,7 +1295,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throw new AMQException("Ambiguous address, please specify queue or topic as node type"); } dest.setAddressType(type); - dest.rebuildTargetAndSourceNodes(type); return type; } } @@ -1309,30 +1316,45 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } } - - private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException + + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException { - QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - - if (dest.getQueueName() == null) + Link link = dest.getLink(); + String queueName = dest.getQueueName(); + + if (queueName == null) { - if (dest.getLink() != null && dest.getLink().getName() != null) - { - dest.setQueueName(new AMQShortString(dest.getLink().getName())); - } + queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); + dest.setQueueName(new AMQShortString(queueName)); + } + + SubscriptionQueue queueProps = link.getSubscriptionQueue(); + Map<String,Object> arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + if (link.isDurable() && queueName.startsWith("TempQueue")) + { + throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); } - node.setExclusive(true); - node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,noLocal,true, false); - getQpidSession().exchangeBind(dest.getQueueName(), - dest.getAddressName(), - dest.getSubject(), - Collections.<String,Object>emptyMap()); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + + getQpidSession().queueDeclare(queueName, + queueProps.getAlternateExchange(), arguments, + queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + link.isDurable() ? Option.DURABLE : Option.NONE, + queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + Map<String,Object> bindingArguments = new HashMap<String, Object>(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + getQpidSession().exchangeBind(queueName, + dest.getAddressName(), + dest.getSubject(), + bindingArguments); } - - public void setLegacyFiledsForQueueType(AMQDestination dest) + + public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support dest.setQueueName(new AMQShortString(dest.getAddressName())); @@ -1345,7 +1367,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // legacy support dest.setExchangeName(new AMQShortString(dest.getAddressName())); - ExchangeNode node = (ExchangeNode)dest.getTargetNode(); + Node node = dest.getNode(); dest.setExchangeClass(node.getExchangeType() == null? ExchangeDefaults.TOPIC_EXCHANGE_CLASS: new AMQShortString(node.getExchangeType())); @@ -1424,6 +1446,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return _qpidSession.isFlowBlocked(); } + @Override + public void setFlowControl(boolean active) + { + // Supported by 0-8..0-9-1 only + throw new UnsupportedOperationException("Operation not supported by this protocol"); + } + private void cancelTimerTask() { if (flushTask != null) @@ -1432,5 +1461,148 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } } -} + private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + { + Node node = dest.getNode(); + Map<String,Object> arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + getQpidSession().queueDeclare(dest.getAddressName(), + node.getAlternateExchange(), arguments, + node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + node.isDurable() ? Option.DURABLE : Option.NONE, + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + sendExchangeDeclare(dest.getAddressName(), + node.getExchangeType(), + node.getAlternateExchange(), + node.getDeclareArgs(), + false, + node.isDurable(), + node.isAutoDelete()); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + void createBindings(AMQDestination dest, List<Binding> bindings) + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeUnbind(queue, exchange, + binding.getBindingKey()); + } + } + + void deleteSubscriptionQueue(AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest, false)) + { + getQpidSession().queueDelete(dest.getQueueName()); + } + } + + void handleNodeDelete(AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + getQpidSession().exchangeDelete(dest.getAddressName()); + } + } + else + { + if (isQueueExist(dest,false)) + { + getQpidSession().queueDelete(dest.getAddressName()); + } + } + } +} |