diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java | 86 |
1 files changed, 81 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 784c33cf02..d1ca6adb60 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -20,6 +20,16 @@ */ package org.apache.qpid.client.message; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import javax.jms.JMSException; -import javax.jms.Session; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * This abstract class provides exchange lookup functionality that is shared * between all MessageDelegates. Update facilities are provided so that the 0-10 @@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { + private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class); private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>(); private static Map<String,ExchangeInfo> _exchangeMap = new ConcurrentHashMap<String, ExchangeInfo>(); @@ -222,6 +228,76 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { return _session; } + + protected Destination convertToAddressBasedDestination(String exchange, + String routingKey, + String subject, + boolean useNodeTypeForDestinationType, + int type) + { + String addr; + boolean isQueue = true; + if ("".equals(exchange)) // type Queue + { + subject = (subject == null) ? "" : "/" + subject; + addr = routingKey + subject; + + } + else + { + addr = exchange + "/" + routingKey; + isQueue = false; + } + + if(useNodeTypeForDestinationType) + { + if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange)) + { + type = AMQDestination.QUEUE_TYPE; + } + + switch(type) + { + case AMQDestination.QUEUE_TYPE: + addr = addr + " ; { node: { type: queue } } "; + break; + case AMQDestination.TOPIC_TYPE: + addr = addr + " ; { node: { type: topic } } "; + break; + default: + // do nothing + } + } + + + try + { + AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr, + useNodeTypeForDestinationType); + if (isQueue) + { + dest.setQueueName(new AMQShortString(routingKey)); + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString("")); + } + else + { + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString(exchange)); + } + return dest; + } + catch(Exception e) + { + // An exception is only thrown here if the address syntax is invalid. + // Logging the exception, but not throwing as this is only important to Qpid developers. + // An exception here means a bug in the code. + _logger.error("Exception when constructing an address string from the ReplyTo struct"); + + // falling back to the old way of doing it to ensure the application continues. + return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + } + } } class ExchangeInfo |