diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 439 |
1 files changed, 49 insertions, 390 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 3812e612aa..40c9113a2d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -23,11 +23,8 @@ import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -37,8 +34,6 @@ import javax.jms.Destination; import javax.jms.JMSException; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; @@ -47,18 +42,15 @@ import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Link.Reliability; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.messaging.address.AddressResolver; +import org.apache.qpid.messaging.address.amqp_0_10.SubscriptionSettings_0_10; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.ExchangeQueryResult; -import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; @@ -143,7 +135,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * USed to store the range of in tx messages */ private RangeSet _txRangeSet = new RangeSet(); - private int _txSize = 0; + private int _txSize = 0; + private AddressResolver addressResolver; //--- constructors /** @@ -345,31 +338,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - List<Binding> bindings = new ArrayList<Binding>(); - bindings.addAll(destination.getSourceNode().getBindings()); - bindings.addAll(destination.getTargetNode().getBindings()); - - String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? - destination.getAddressName(): "amq.topic"; - - for (Binding binding: bindings) - { - String queue = binding.getQueue() == null? - queueName.asString(): binding.getQueue(); - - String exchange = binding.getExchange() == null ? - defaultExchange : - binding.getExchange(); - - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + - " with args " + printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); - } + // do nothing atm + // when creating a producer or consumer the create/assert method should be invokved + // for consumers create and delete subscriptions should be called in the constructor and close() + // when closing them the delete method should be invoked } if (!nowait) @@ -573,45 +545,51 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { boolean preAcquire; - long capacity = getCapacity(consumer.getDestination()); + long capacity; + try + { + capacity = consumer.getDestination().getConsumerCapacity(this); + } + catch (Exception e1) + { + AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR, "Error retrieving capacity",e1); + throw ex; + } try { - boolean isTopic; + boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { - isTopic = consumer.getDestination() instanceof AMQTopic || + boolean isTopic = consumer.getDestination() instanceof AMQTopic || consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ; preAcquire = isTopic || (!consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""))); + + getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), + acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, + null, 0, arguments, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } else { - isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE; - + AddressBasedDestination dest = (AddressBasedDestination)consumer.getDestination(); preAcquire = !consumer.isNoConsume() && - (isTopic || consumer.getMessageSelector() == null || + (dest.isTopic() || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); - arguments.putAll( - (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); + SubscriptionSettings_0_10 settings = new SubscriptionSettings_0_10(); + settings.setAcceptMode(acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT); + settings.setAccquireMode(preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED); + settings.setArgs(arguments); + settings.setMessageSelector(messageSelector); + settings.setSubscriptionTag(String.valueOf(tag)); + dest.createSubscription(this,settings); } - - boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - - if (consumer.getDestination().getLink() != null) - { - acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; - } - - getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), - acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, - preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) { @@ -646,21 +624,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private long getCapacity(AMQDestination destination) - { - long capacity = 0; - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (prefetch()) - { - capacity = getAMQConnection().getMaxPrefetch(); - } - return capacity; - } - /** * Create an 0_10 message producer */ @@ -775,12 +738,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - QueueNode node = (QueueNode)amqd.getSourceNode(); - getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() , - node.getDeclareArgs(), - node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, - node.isDurable() ? Option.DURABLE : Option.NONE, - node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + // do nothing } // passive --> false @@ -825,7 +783,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic //only set if msg list is null try { - long capacity = getCapacity(consumer.getDestination()); + long capacity = consumer.getDestination().getConsumerCapacity(this); if (capacity == 0) { @@ -1056,317 +1014,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return AMQMessageDelegateFactory.FACTORY_0_10; } - - public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode) - { - boolean match = true; - ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); - match = !result.getNotFound(); - - if (match) - { - if (assertNode) - { - match = (result.getDurable() == node.isDurable()) && - (node.getExchangeType() != null && - node.getExchangeType().equals(result.getType())) && - (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (node.getExchangeType() != null) - { - // even if assert is false, better to verify this - match = node.getExchangeType().equals(result.getType()); - if (!match) - { - _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + - " actual " + result.getType()); - } - } - else - { - _logger.debug("Setting Exchange type " + result.getType()); - node.setExchangeType(result.getType()); - dest.setExchangeClass(new AMQShortString(result.getType())); - } - } - - return match; - } - - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException - { - boolean match = true; - try - { - QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); - match = dest.getAddressName().equals(result.getQueue()); - - if (match && assertNode) - { - match = (result.getDurable() == node.isDurable()) && - (result.getAutoDelete() == node.isAutoDelete()) && - (result.getExclusive() == node.isExclusive()) && - (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (match) - { - // should I use the queried details to update the local data structure. - } - } - catch(SessionException e) - { - if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) - { - match = false; - } - else - { - throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), - "Error querying queue",e); - } - } - - return match; - } - - private boolean matchProps(Map<String,Object> target,Map<String,Object> source) - { - boolean match = true; - for (String key: source.keySet()) - { - match = target.containsKey(key) && - target.get(key).equals(source.get(key)); - - if (!match) - { - StringBuffer buf = new StringBuffer(); - buf.append("Property given in address did not match with the args sent by the broker."); - buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); - buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }"); - _logger.debug(buf.toString()); - return match; - } - } - - return match; - } - - /** - * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, - * 2.1 verify queue exists or create if create == true - * 2.2 If not throw exception - * - * 3. if type == exchange, - * 3.1 verify exchange exists or create if create == true - * 3.2 if not throw exception - * 3.3 if exchange exists (or created) create subscription queue. - */ - - @SuppressWarnings("deprecation") - public void handleAddressBasedDestination(AMQDestination dest, - boolean isConsumer, - boolean noWait) throws AMQException - { - if (dest.isAddressResolved()) - { - if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) - { - createSubscriptionQueue(dest); - } - } - else - { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || - (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || - (!isConsumer && dest.getAssert() == AddressOption.SENDER); - - boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || - (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || - (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - - int type = resolveAddressType(dest); - - if (type == AMQDestination.QUEUE_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.AT_LEAST_ONCE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.UNSPECIFIED) - { - dest.getLink().setReliability(Reliability.UNRELIABLE); - } - else if (type == AMQDestination.TOPIC_TYPE && - dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE) - { - throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics"); - } - - switch (type) - { - case AMQDestination.QUEUE_TYPE: - { - if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) - { - setLegacyFiledsForQueueType(dest); - break; - } - else if(createNode) - { - setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,false,noWait); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); - break; - } - } - - case AMQDestination.TOPIC_TYPE: - { - if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest); - } - break; - } - else if(createNode) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - sendExchangeDeclare(dest.getAddressName(), - dest.getExchangeClass().asString(), - dest.getTargetNode().getAlternateExchange(), - dest.getTargetNode().getDeclareArgs(), - false); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest); - } - break; - } - } - - default: - throw new AMQException( - "The name '" + dest.getAddressName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); - } - dest.setAddressResolved(true); - } - } - - public int resolveAddressType(AMQDestination dest) throws AMQException - { - int type = dest.getAddressType(); - String name = dest.getAddressName(); - if (type != AMQDestination.UNKNOWN_TYPE) - { - return type; - } - else - { - ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get(); - if (result.getQueueNotFound() && result.getExchangeNotFound()) { - //neither a queue nor an exchange exists with that name; treat it as a queue - type = AMQDestination.QUEUE_TYPE; - } else if (result.getExchangeNotFound()) { - //name refers to a queue - type = AMQDestination.QUEUE_TYPE; - } else if (result.getQueueNotFound()) { - //name refers to an exchange - type = AMQDestination.TOPIC_TYPE; - } else { - //both a queue and exchange exist for that name - throw new AMQException("Ambiguous address, please specify queue or topic as node type"); - } - dest.setAddressType(type); - dest.rebuildTargetAndSourceNodes(type); - return type; - } - } - - private void verifySubject(AMQDestination dest) throws AMQException - { - if (dest.getSubject() == null || dest.getSubject().trim().equals("")) - { - - if ("topic".equals(dest.getExchangeClass().toString())) - { - dest.setRoutingKey(new AMQShortString("#")); - dest.setSubject(dest.getRoutingKey().toString()); - } - else - { - dest.setRoutingKey(new AMQShortString("")); - dest.setSubject(""); - } - } - } - - private void createSubscriptionQueue(AMQDestination dest) throws AMQException - { - QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - - if (dest.getQueueName() == null) - { - if (dest.getLink() != null && dest.getLink().getName() != null) - { - dest.setQueueName(new AMQShortString(dest.getLink().getName())); - } - } - node.setExclusive(true); - node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,false,true); - node.addBinding(new Binding(dest.getAddressName(), - dest.getQueueName(),// should have one by now - dest.getSubject(), - Collections.<String,Object>emptyMap())); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); - } - - public void setLegacyFiledsForQueueType(AMQDestination dest) - { - // legacy support - dest.setQueueName(new AMQShortString(dest.getAddressName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); - } - - public void setLegacyFiledsForTopicType(AMQDestination dest) - { - // legacy support - dest.setExchangeName(new AMQShortString(dest.getAddressName())); - ExchangeNode node = (ExchangeNode)dest.getTargetNode(); - dest.setExchangeClass(node.getExchangeType() == null? - ExchangeDefaults.TOPIC_EXCHANGE_CLASS: - new AMQShortString(node.getExchangeType())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - } - - /** This should be moved to a suitable utility class */ - private String printMap(Map<String,Object> map) - { - StringBuilder sb = new StringBuilder(); - sb.append("<"); - if (map != null) - { - for(String key : map.keySet()) - { - sb.append(key).append(" = ").append(map.get(key)).append(" "); - } - } - sb.append(">"); - return sb.toString(); - } protected void acknowledgeImpl() { @@ -1389,4 +1036,16 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _highestDeliveryTag.set(-1); super.resubscribe(); } + + public boolean isQueueExist(String queue) + { + QueueQueryResult result = _qpidSession.queueQuery(queue, Option.NONE).get(); + return queue.equals(result.getQueue()); + } + + public boolean isExchangeExist(String exchange) + { + ExchangeQueryResult result = _qpidSession.exchangeQuery(exchange, Option.NONE).get(); + return !result.getNotFound(); + } } |