summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java122
1 files changed, 32 insertions, 90 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 8b4488f1f9..1479f06632 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.nclient.*;
import org.apache.qpid.jms.Message;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
@@ -42,12 +41,13 @@ import javax.jms.MessageNotWriteableException;
import javax.jms.MessageFormatException;
import javax.jms.DeliveryMode;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import org.apache.qpid.exchange.ExchangeDefaults;
-public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
+/**
+ * This extends AbstractAMQMessageDelegate which contains common code between
+ * both the 0_8 and 0_10 Message types.
+ *
+ */
+public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap());
@@ -65,27 +65,6 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
private AMQSession _session;
private final long _deliveryTag;
- private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>();
- private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>();
- private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();;
-
- static
- {
- _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
- _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
-
- _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
- _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
- _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
- _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
-
-
- _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE);
- _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
- _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
- }
protected AMQMessageDelegate_0_10()
{
@@ -93,87 +72,49 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
_readableProperties = false;
}
- protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange,
- AMQShortString routingKey) throws AMQException
+ protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
{
- this(messageProps, deliveryProps, deliveryTag);
-
-
- AMQDestination dest;
-
- dest = generateDestination(exchange, routingKey);
- setJMSDestination(dest);
- }
+ _messageProps = messageProps;
+ _deliveryProps = deliveryProps;
+ _deliveryTag = deliveryTag;
+ _readableProperties = (_messageProps != null);
- private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey)
- {
AMQDestination dest;
- switch(getExchangeType(exchange))
- {
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
-
- }
-
- return dest;
- }
-
- private int getExchangeType(AMQShortString exchange)
- {
- Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange);
-
- if(type == null)
- {
- return AMQDestination.UNKNOWN_TYPE;
- }
-
- return type;
+ dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
+ new AMQShortString(_deliveryProps.getRoutingKey()));
+ setJMSDestination(dest);
}
-
- public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session)
+ /**
+ * Use the 0-10 ExchangeQuery call to validate the exchange type.
+ *
+ * This is used primarily to provide the correct JMSDestination value.
+ *
+ * The query is performed synchronously iff the map exchange is not already
+ * present in the exchange Map.
+ *
+ * @param header The message headers, from which the exchange name can be extracted
+ * @param session The 0-10 session to use to call ExchangeQuery
+ */
+ public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session)
{
DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
- if(deliveryProps != null)
+ if (deliveryProps != null)
{
String exchange = deliveryProps.getExchange();
- if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange))
+ if (exchange != null && !exchangeMapContains(exchange))
{
-
- AMQShortString exchangeShortString = new AMQShortString(exchange);
Future<ExchangeQueryResult> future =
- session.exchangeQuery(exchange.toString());
+ session.exchangeQuery(exchange.toString());
ExchangeQueryResult res = future.get();
- Integer type = _exchangeTypeToDestinationType.get(res.getType());
- if(type == null)
- {
- type = AMQDestination.UNKNOWN_TYPE;
- }
- _exchangeTypeStringMap.put(exchange, type);
- _exchangeTypeMap.put(exchangeShortString, type);
-
+ updateExchangeType(exchange, res.getType());
}
}
}
- protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
- {
- _messageProps = messageProps;
- _deliveryProps = deliveryProps;
- _deliveryTag = deliveryTag;
- _readableProperties = (_messageProps != null);
-
- }
-
public String getJMSMessageID() throws JMSException
{
@@ -287,7 +228,8 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
{
if (destination == null)
{
- throw new IllegalArgumentException("Null destination not allowed");
+ _messageProps.setReplyTo(null);
+ return;
}
if (!(destination instanceof AMQDestination))