summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-09-05 02:20:53 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-09-05 02:20:53 +0000
commit19b4b9894848e54aabf8714da33178eb367337e4 (patch)
tree59fde7cf54b3c089f227a5395d72283c974d9d61
parent4de82dc6e9215f1e54083c130a6bbeb1c3c6ed4a (diff)
downloadqpid-python-19b4b9894848e54aabf8714da33178eb367337e4.tar.gz
QPID-3466 Added logic to convert exchange/replyTo pairs into address
strings. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1165148 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java61
1 files changed, 58 insertions, 3 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 182b7b65d8..2793411f74 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -47,6 +47,7 @@ import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.Future;
@@ -55,6 +56,8 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This extends AbstractAMQMessageDelegate which contains common code between
@@ -63,6 +66,7 @@ import org.apache.qpid.transport.ReplyTo;
*/
public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate_0_10.class);
private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
public static final String JMS_TYPE = "x-jms-type";
@@ -95,8 +99,22 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
AMQDestination dest;
- dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
+ if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+ dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
new AMQShortString(_deliveryProps.getRoutingKey()));
+ }
+ else
+ {
+ String subject = null;
+ if (messageProps != null && messageProps.getApplicationHeaders() != null)
+ {
+ subject = (String)messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT);
+ }
+ dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
+ _deliveryProps.getRoutingKey(), subject);
+ }
+
setJMSDestination(dest);
}
@@ -242,13 +260,50 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+
+ dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ else
+ {
+ dest = convertToAddressBasedDestination(exchange,routingKey,null);
+ }
_destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
return dest;
}
}
+
+ private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
+ {
+ String addr;
+ if ("".equals(exchange)) // type Queue
+ {
+ subject = (subject == null) ? "" : "/" + subject;
+ addr = routingKey + subject;
+ }
+ else
+ {
+ addr = exchange + "/" + routingKey;
+ }
+
+ try
+ {
+ return AMQDestination.createDestination("ADDR:" + addr.toString() + ";{create: always}");
+ }
+ catch(Exception e)
+ {
+ // An exception is only thrown here if the address syntax is invalid.
+ // Logging the exception, but not throwing as this is only important to Qpid developers.
+ // An exception here means a bug in the code.
+ _logger.error("Exception when constructing an address string from the ReplyTo struct");
+
+ // falling back to the old way of doing it to ensure the application continues.
+ return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ }
public void setJMSReplyTo(Destination destination) throws JMSException
{
@@ -337,7 +392,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
Destination replyTo = getJMSReplyTo();
if(replyTo != null)
{
- return ((AMQDestination)replyTo).toURL();
+ return ((AMQDestination)replyTo).toString();
}
else
{