diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:35:10 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:35:10 +0000 |
commit | 522a81e02faf3e66e25161655927acf8454aa05e (patch) | |
tree | bc8d2256d815c6b4c599813705b0dc253c3a8c9a /qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | |
parent | f9592e0d891b2eca8b91e06e6da0f8cd6c15f24c (diff) | |
download | qpid-python-522a81e02faf3e66e25161655927acf8454aa05e.tar.gz |
Merging from trunk r1619093:1620329 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 199 |
1 files changed, 24 insertions, 175 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 19720ea386..68b7cf1f88 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; @@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; @@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic " exchange: " + exchange + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); + doBind(destination, binding, queue, exchange); } } @@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean nowait, int tag) throws AMQException, FailoverException { - if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) - { - if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) - { - String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); - - createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); - queueName = consumer.getDestination().getAMQQueueName(); - consumer.setQueuename(queueName); - } - handleLinkCreation(consumer.getDestination()); - } + queueName = preprocessAddressTopic(consumer, queueName); boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); } + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + { + sendExchangeDeclare(name.asString(), type.asString(), null, + arguments == null ? null : FieldTableSupport.convertToMap(arguments), + nowait, durable, autoDelete); + } + + public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, final boolean nowait, boolean durable, boolean autoDelete) throws AMQException @@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } + @Override public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; @@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } + @Override public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { Node node = dest.getNode(); @@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - /** - * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, - * 2.1 verify queue exists or create if create == true - * 2.2 If not throw exception - * - * 3. if type == exchange, - * 3.1 verify exchange exists or create if create == true - * 3.2 if not throw exception - * 3.3 if exchange exists (or created) create subscription queue. - */ - - @SuppressWarnings("deprecation") - public void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException - { - if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) - { - return; - } - else - { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || - (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || - (!isConsumer && dest.getAssert() == AddressOption.SENDER); - - boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || - (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || - (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - - int type = resolveAddressType(dest); - - switch (type) - { - case AMQDestination.QUEUE_TYPE: - { - if(createNode) - { - setLegacyFieldsForQueueType(dest); - handleQueueNodeCreation(dest,noLocal); - break; - } - else if (isQueueExist(dest,assertNode)) - { - setLegacyFieldsForQueueType(dest); - break; - } - } - - case AMQDestination.TOPIC_TYPE: - { - if(createNode) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - handleExchangeNodeCreation(dest); - break; - } - else if (isExchangeExist(dest,assertNode)) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - break; - } - } - - default: - throw new AMQException( - "The name '" + dest.getAddressName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); - } - dest.setAddressResolved(System.currentTimeMillis()); - } - } - + @Override public int resolveAddressType(AMQDestination dest) throws AMQException { int type = dest.getAddressType(); @@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void verifySubject(AMQDestination dest) throws AMQException - { - if (dest.getSubject() == null || dest.getSubject().trim().equals("")) - { - - if ("topic".equals(dest.getExchangeClass().toString())) - { - dest.setRoutingKey(new AMQShortString("#")); - dest.setSubject(dest.getRoutingKey().toString()); - } - else - { - dest.setRoutingKey(new AMQShortString("")); - dest.setSubject(""); - } - } - } - + @Override void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException { Link link = dest.getLink(); @@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic bindingArguments); } - public void setLegacyFieldsForQueueType(AMQDestination dest) - { - // legacy support - dest.setQueueName(new AMQShortString(dest.getAddressName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); - } - - public void setLegacyFiledsForTopicType(AMQDestination dest) - { - // legacy support - dest.setExchangeName(new AMQShortString(dest.getAddressName())); - Node node = dest.getNode(); - dest.setExchangeClass(node.getExchangeType() == null? - AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): - new AMQShortString(node.getExchangeType())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - } - protected void acknowledgeImpl() { RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); @@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + @Override + protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException { Node node = dest.getNode(); Map<String,Object> arguments = node.getDeclareArgs(); @@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } + @Override void handleExchangeNodeCreation(AMQDestination dest) throws AMQException { Node node = dest.getNode(); @@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } - void handleLinkCreation(AMQDestination dest) throws AMQException - { - createBindings(dest, dest.getLink().getBindings()); - } - - void createBindings(AMQDestination dest, List<Binding> bindings) + protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange) { - String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest - .getAddressName() : "amq.topic"; - - String defaultQueueName = null; - if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) - { - defaultQueueName = dest.getQueueName(); - } - else - { - defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); - } - - for (Binding binding: bindings) - { - String queue = binding.getQueue() == null? - defaultQueueName: binding.getQueue(); - - String exchange = binding.getExchange() == null ? - defaultExchangeForBinding : - binding.getExchange(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + - " with args " + Strings.printMap(binding.getArgs())); - } - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); - } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); } void handleLinkDelete(AMQDestination dest) throws AMQException @@ -1614,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + @Override void handleNodeDelete(AMQDestination dest) throws AMQException { if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) |