summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java86
1 files changed, 81 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
index 784c33cf02..d1ca6adb60 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.client.message;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* This abstract class provides exchange lookup functionality that is shared
* between all MessageDelegates. Update facilities are provided so that the 0-10
@@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class);
private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();
private static Map<String,ExchangeInfo> _exchangeMap = new ConcurrentHashMap<String, ExchangeInfo>();
@@ -222,6 +228,76 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
return _session;
}
+
+ protected Destination convertToAddressBasedDestination(String exchange,
+ String routingKey,
+ String subject,
+ boolean useNodeTypeForDestinationType,
+ int type)
+ {
+ String addr;
+ boolean isQueue = true;
+ if ("".equals(exchange)) // type Queue
+ {
+ subject = (subject == null) ? "" : "/" + subject;
+ addr = routingKey + subject;
+
+ }
+ else
+ {
+ addr = exchange + "/" + routingKey;
+ isQueue = false;
+ }
+
+ if(useNodeTypeForDestinationType)
+ {
+ if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange))
+ {
+ type = AMQDestination.QUEUE_TYPE;
+ }
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ addr = addr + " ; { node: { type: queue } } ";
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ addr = addr + " ; { node: { type: topic } } ";
+ break;
+ default:
+ // do nothing
+ }
+ }
+
+
+ try
+ {
+ AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr,
+ useNodeTypeForDestinationType);
+ if (isQueue)
+ {
+ dest.setQueueName(new AMQShortString(routingKey));
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(""));
+ }
+ else
+ {
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(exchange));
+ }
+ return dest;
+ }
+ catch(Exception e)
+ {
+ // An exception is only thrown here if the address syntax is invalid.
+ // Logging the exception, but not throwing as this is only important to Qpid developers.
+ // An exception here means a bug in the code.
+ _logger.error("Exception when constructing an address string from the ReplyTo struct");
+
+ // falling back to the old way of doing it to ensure the application continues.
+ return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ }
}
class ExchangeInfo