diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java | 122 |
1 files changed, 32 insertions, 90 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 8b4488f1f9..1479f06632 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -29,7 +29,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.nclient.*; import org.apache.qpid.jms.Message; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; @@ -42,12 +41,13 @@ import javax.jms.MessageNotWriteableException; import javax.jms.MessageFormatException; import javax.jms.DeliveryMode; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import org.apache.qpid.exchange.ExchangeDefaults; -public class AMQMessageDelegate_0_10 implements AMQMessageDelegate +/** + * This extends AbstractAMQMessageDelegate which contains common code between + * both the 0_8 and 0_10 Message types. + * + */ +public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap()); @@ -65,27 +65,6 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate private AMQSession _session; private final long _deliveryTag; - private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>(); - private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>(); - private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();; - - static - { - _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE); - _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE); - _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); - _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); - - _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE); - _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE); - _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - - - _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - } protected AMQMessageDelegate_0_10() { @@ -93,87 +72,49 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate _readableProperties = false; } - protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange, - AMQShortString routingKey) throws AMQException + protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag) { - this(messageProps, deliveryProps, deliveryTag); - - - AMQDestination dest; - - dest = generateDestination(exchange, routingKey); - setJMSDestination(dest); - } + _messageProps = messageProps; + _deliveryProps = deliveryProps; + _deliveryTag = deliveryTag; + _readableProperties = (_messageProps != null); - private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey) - { AMQDestination dest; - switch(getExchangeType(exchange)) - { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - - } - - return dest; - } - - private int getExchangeType(AMQShortString exchange) - { - Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange); - - if(type == null) - { - return AMQDestination.UNKNOWN_TYPE; - } - - return type; + dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()), + new AMQShortString(_deliveryProps.getRoutingKey())); + setJMSDestination(dest); } - - public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session) + /** + * Use the 0-10 ExchangeQuery call to validate the exchange type. + * + * This is used primarily to provide the correct JMSDestination value. + * + * The query is performed synchronously iff the map exchange is not already + * present in the exchange Map. + * + * @param header The message headers, from which the exchange name can be extracted + * @param session The 0-10 session to use to call ExchangeQuery + */ + public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session) { DeliveryProperties deliveryProps = header.get(DeliveryProperties.class); - if(deliveryProps != null) + if (deliveryProps != null) { String exchange = deliveryProps.getExchange(); - if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange)) + if (exchange != null && !exchangeMapContains(exchange)) { - - AMQShortString exchangeShortString = new AMQShortString(exchange); Future<ExchangeQueryResult> future = - session.exchangeQuery(exchange.toString()); + session.exchangeQuery(exchange.toString()); ExchangeQueryResult res = future.get(); - Integer type = _exchangeTypeToDestinationType.get(res.getType()); - if(type == null) - { - type = AMQDestination.UNKNOWN_TYPE; - } - _exchangeTypeStringMap.put(exchange, type); - _exchangeTypeMap.put(exchangeShortString, type); - + updateExchangeType(exchange, res.getType()); } } } - protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag) - { - _messageProps = messageProps; - _deliveryProps = deliveryProps; - _deliveryTag = deliveryTag; - _readableProperties = (_messageProps != null); - - } - public String getJMSMessageID() throws JMSException { @@ -287,7 +228,8 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate { if (destination == null) { - throw new IllegalArgumentException("Null destination not allowed"); + _messageProps.setReplyTo(null); + return; } if (!(destination instanceof AMQDestination)) |