summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-09-02 16:05:58 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-09-02 16:05:58 +0000
commite514c426b243b53ede124fbe0ee8077be26f3b5b (patch)
tree9f3eae5fa1267134ba82ae3695546861944214b3
parent2eeebe50b2d779b019c7a28a47ac8c5604636a2b (diff)
downloadqpid-python-e514c426b243b53ede124fbe0ee8077be26f3b5b.tar.gz
Merged QPID-6052 (r1621143 r1621148 r1621149 r1621150 r1621826) to 0.30 from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1622044 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java52
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java83
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java86
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java415
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java2
-rw-r--r--qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java2
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java9
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java6
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java2
25 files changed, 421 insertions, 407 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
index 0e1658d2b3..57f8343874 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
@@ -58,7 +58,7 @@ public class AMQAnyDestination extends AMQDestination implements Queue, Topic
super(str);
}
- public AMQAnyDestination(Address addr) throws Exception
+ public AMQAnyDestination(Address addr)
{
super(addr);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 2714caf2a1..a71555480f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -211,7 +211,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
{
}
- protected AMQDestination(Address address) throws Exception
+ protected AMQDestination(Address address)
{
this._address = address;
getInfoFromAddress();
@@ -749,7 +749,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
}
}
- public static Destination createDestination(String str) throws Exception
+ public static Destination createDestination(String str, final boolean useNodeTypeForDestinationType)
+ throws URISyntaxException
{
DestSyntax syntax = getDestType(str);
str = stripSyntaxPrefix(str);
@@ -760,7 +761,24 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
else
{
Address address = createAddressFromString(str);
- return new AMQAnyDestination(address);
+ if(useNodeTypeForDestinationType)
+ {
+ AddressHelper helper = new AddressHelper(address);
+ switch(helper.getNodeType())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ return new AMQQueue(address);
+ case AMQDestination.TOPIC_TYPE:
+ return new AMQTopic(address);
+ default:
+ return new AMQAnyDestination(address);
+ }
+
+ }
+ else
+ {
+ return new AMQAnyDestination(address);
+ }
}
}
@@ -912,7 +930,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
return Address.parse(str);
}
- private void getInfoFromAddress() throws Exception
+ private void getInfoFromAddress()
{
_name = _address.getName();
_subject = _address.getSubject();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index f65ecece2b..9b00883dfb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -20,13 +20,15 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
+import javax.jms.Queue;
+
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.BindingURL;
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-
public class AMQQueue extends AMQDestination implements Queue
{
private static final long serialVersionUID = -1283142598932655606L;
@@ -36,6 +38,11 @@ public class AMQQueue extends AMQDestination implements Queue
super();
}
+ public AMQQueue(Address address)
+ {
+ super(address);
+ }
+
public AMQQueue(String address) throws URISyntaxException
{
super(address);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0183c30276..3562a10f27 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1423,7 +1423,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
Queue dest = validateQueue(destination);
C consumer = (C) createConsumer(dest);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1442,7 +1442,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
Queue dest = validateQueue(destination);
C consumer = (C) createConsumer(dest, messageSelector);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1460,7 +1460,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1479,7 +1479,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest, messageSelector);
-
+ consumer.setAddressType(AMQDestination.QUEUE_TYPE);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -2589,7 +2589,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
("Cannot create a durable subscription with a temporary topic: " + topic);
}
- if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+ if (!(topic instanceof AMQDestination))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 0145d15111..fbf8b27630 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -764,7 +764,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache,
+ _topicDestinationCache, AMQDestination.UNKNOWN_TYPE);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 1a7b6bea80..9cdbc1e189 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -20,16 +20,16 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.url.BindingURL;
+import java.net.URISyntaxException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
-import java.net.URISyntaxException;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.url.BindingURL;
public class AMQTopic extends AMQDestination implements Topic
{
@@ -40,7 +40,7 @@ public class AMQTopic extends AMQDestination implements Topic
super(address);
}
- public AMQTopic(Address address) throws Exception
+ public AMQTopic(Address address)
{
super(address);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 187be8522c..3d0e972ca2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -136,6 +136,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
private List<StackTraceElement> _closedStack = null;
private boolean _isDurableSubscriber = false;
+ private int _addressType = AMQDestination.UNKNOWN_TYPE;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -203,7 +204,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
_arguments = ft;
-
+ _addressType = _destination.getAddressType();
}
public AMQDestination getDestination()
@@ -1066,4 +1067,14 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
_isDurableSubscriber = true;
}
+
+ void setAddressType(final int addressType)
+ {
+ _addressType = addressType;
+ }
+
+ int getAddressType()
+ {
+ return _addressType;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cdffc73932..459030a10d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -151,7 +151,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(), messageFrame.getExchange() == null ? AMQShortString.EMPTY_STRING : messageFrame.getExchange(),
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
- _queueDestinationCache, _topicDestinationCache);
+ _queueDestinationCache, _topicDestinationCache, getAddressType());
}
@@ -164,4 +164,6 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
{
return _rejectBehaviour;
}
+
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index 9bdef22f96..efe91f6797 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQException;
-
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import org.apache.qpid.AMQException;
+
/**
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
@@ -43,6 +43,7 @@ class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSub
_topic = topic;
_consumer = consumer;
_noLocal = noLocal;
+ consumer.setAddressType(AMQDestination.TOPIC_TYPE);
}
TopicSubscriberAdaptor(Topic topic, C consumer)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index bd089eb6a8..fa0a6be91c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -129,11 +129,11 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
if (subject != null)
{
messageProps.getApplicationHeaders().remove(QpidMessageProperties.QPID_SUBJECT);
- messageProps.getApplicationHeaders().put("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER,subject);
+ messageProps.getApplicationHeaders().put(QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY,subject);
}
}
dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
- _deliveryProps.getRoutingKey(), subject);
+ _deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE);
}
setJMSDestination(dest);
@@ -280,7 +280,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
else
{
- dest = convertToAddressBasedDestination(exchange,routingKey,null);
+ dest = convertToAddressBasedDestination(exchange,routingKey,null, false,
+ AMQDestination.UNKNOWN_TYPE);
}
_destinationCache.put(replyTo, dest);
}
@@ -288,49 +289,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
return dest;
}
}
-
- private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
- {
- String addr;
- boolean isQueue = true;
- if ("".equals(exchange)) // type Queue
- {
- subject = (subject == null) ? "" : "/" + subject;
- addr = routingKey + subject;
- }
- else
- {
- addr = exchange + "/" + routingKey;
- isQueue = false;
- }
-
- try
- {
- AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr);
- if (isQueue)
- {
- dest.setQueueName(new AMQShortString(routingKey));
- dest.setRoutingKey(new AMQShortString(routingKey));
- dest.setExchangeName(new AMQShortString(""));
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(routingKey));
- dest.setExchangeName(new AMQShortString(exchange));
- }
- return dest;
- }
- catch(Exception e)
- {
- // An exception is only thrown here if the address syntax is invalid.
- // Logging the exception, but not throwing as this is only important to Qpid developers.
- // An exception here means a bug in the code.
- _logger.error("Exception when constructing an address string from the ReplyTo struct");
-
- // falling back to the old way of doing it to ensure the application continues.
- return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
- }
- }
public void setJMSReplyTo(Destination destination) throws JMSException
{
@@ -747,7 +705,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
else if (isStrictJMS && QpidMessageProperties.QPID_SUBJECT.equals(propertyName))
{
- return (String)getApplicationHeaders().get("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER);
+ return (String)getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY);
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index d9d2cf85d3..486023e847 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -21,6 +21,19 @@
package org.apache.qpid.client.message;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -32,17 +45,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
@@ -63,6 +65,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
});
public static final String JMS_TYPE = "x-jms-type";
+ public static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms");
private boolean _readableProperties = false;
@@ -96,7 +99,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
// Used when generating a received message object
protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
{
this(contentHeader, deliveryTag);
@@ -104,28 +108,46 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
AMQDestination dest = null;
- // If we have a type set the attempt to use that.
- if (type != null)
+ if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
{
- switch (type.intValue())
+ // If we have a type set the attempt to use that.
+ if (type != null)
{
- case AMQDestination.QUEUE_TYPE:
- dest = queueDestinationCache.getDestination(exchange, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = topicDestinationCache.getDestination(exchange, routingKey);
- break;
- default:
- // Use the generateDestination method
- dest = null;
+ switch (type.intValue())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = queueDestinationCache.getDestination(exchange, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = topicDestinationCache.getDestination(exchange, routingKey);
+ break;
+ default:
+ // Use the generateDestination method
+ dest = null;
+ }
}
- }
- if (dest == null)
+ if (dest == null)
+ {
+ dest = generateDestination(exchange, routingKey);
+ }
+ }
+ else
{
- dest = generateDestination(exchange, routingKey);
+ String subject = null;
+ if (contentHeader.getHeaders() != null
+ && contentHeader.getHeaders().containsKey(QpidMessageProperties.QPID_SUBJECT))
+ {
+ subject = contentHeader.getHeaders().getString(QpidMessageProperties.QPID_SUBJECT);
+ }
+ if(type == null)
+ {
+ type = addressType;
+ }
+ dest = (AMQDestination) convertToAddressBasedDestination(AMQShortString.toString(exchange),
+ AMQShortString.toString(routingKey), subject,
+ true, type);
}
-
setJMSDestination(dest);
}
@@ -484,7 +506,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
public Enumeration getPropertyNames() throws JMSException
{
- return getJmsHeaders().getPropertyNames();
+ Set<String> keys = getJmsHeaders().getPropertyNames();
+ return Collections.enumeration(keys);
}
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
@@ -556,7 +579,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
}
checkWritableProperties();
- getJmsHeaders().setDouble(propertyName, new Double(v));
+ getJmsHeaders().setDouble(propertyName, v);
}
public void setStringProperty(String propertyName, String value) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
index 784c33cf02..d1ca6adb60 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.client.message;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* This abstract class provides exchange lookup functionality that is shared
* between all MessageDelegates. Update facilities are provided so that the 0-10
@@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class);
private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();
private static Map<String,ExchangeInfo> _exchangeMap = new ConcurrentHashMap<String, ExchangeInfo>();
@@ -222,6 +228,76 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
{
return _session;
}
+
+ protected Destination convertToAddressBasedDestination(String exchange,
+ String routingKey,
+ String subject,
+ boolean useNodeTypeForDestinationType,
+ int type)
+ {
+ String addr;
+ boolean isQueue = true;
+ if ("".equals(exchange)) // type Queue
+ {
+ subject = (subject == null) ? "" : "/" + subject;
+ addr = routingKey + subject;
+
+ }
+ else
+ {
+ addr = exchange + "/" + routingKey;
+ isQueue = false;
+ }
+
+ if(useNodeTypeForDestinationType)
+ {
+ if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange))
+ {
+ type = AMQDestination.QUEUE_TYPE;
+ }
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ addr = addr + " ; { node: { type: queue } } ";
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ addr = addr + " ; { node: { type: topic } } ";
+ break;
+ default:
+ // do nothing
+ }
+ }
+
+
+ try
+ {
+ AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr,
+ useNodeTypeForDestinationType);
+ if (isQueue)
+ {
+ dest.setQueueName(new AMQShortString(routingKey));
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(""));
+ }
+ else
+ {
+ dest.setRoutingKey(new AMQShortString(routingKey));
+ dest.setExchangeName(new AMQShortString(exchange));
+ }
+ return dest;
+ }
+ catch(Exception e)
+ {
+ // An exception is only thrown here if the address syntax is invalid.
+ // Logging the exception, but not throwing as this is only important to Qpid developers.
+ // An exception here means a bug in the code.
+ _logger.error("Exception when constructing an address string from the ReplyTo struct");
+
+ // falling back to the old way of doing it to ensure the application continues.
+ return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ }
}
class ExchangeInfo
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 9748038b9b..65f4478baa 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -47,11 +47,14 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
- protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
+ protected AbstractJMSMessage create08MessageWithBody(long messageNbr,
+ ContentHeaderBody contentHeader,
+ AMQShortString exchange,
+ AMQShortString routingKey,
List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -117,7 +120,8 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
contentHeader.getProperties(),
- exchange, routingKey, queueDestinationCache, topicDestinationCache);
+ exchange, routingKey, queueDestinationCache,
+ topicDestinationCache, addressType);
return createMessage(delegate, data);
}
@@ -162,13 +166,15 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
return message;
}
+ @Override
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
AMQShortString exchange, AMQShortString routingKey, List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
throws JMSException, AMQException
{
- final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+ final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
msg.setJMSRedelivered(redelivered);
msg.setReceivedFromServer();
return msg;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
index 122a5c4ef2..e27b3d81cb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
@@ -20,77 +20,76 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Enumeration;
+
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.FieldTable;
public final class JMSHeaderAdapter
{
- private final FieldTable _headers;
+ private static final Map<String,String> AMQP_TO_JMS_HEADER_NAME_MAPPINGS;
+ private static final Map<String,String> JMS_TO_AMQP_HEADER_NAME_MAPPINGS;
- public JMSHeaderAdapter(FieldTable headers)
- {
- _headers = headers;
- }
- public FieldTable getHeaders()
+ static
{
- return _headers;
- }
+ String[][] mappings = {
+ { QpidMessageProperties.QPID_SUBJECT, QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY }
+ };
- public boolean getBoolean(String string) throws JMSException
- {
- checkPropertyName(string);
- Boolean b = getHeaders().getBoolean(string);
+ Map<String,String> amqpToJmsHeaderNameMappings = new HashMap<>();
+ Map<String,String> jmsToAmqpHeaderNameMappings = new HashMap<>();
- if (b == null)
+ for(String[] mapping : mappings)
{
- if (getHeaders().containsKey(string))
- {
- Object str = getHeaders().getObject(string);
-
- if (!(str instanceof String))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf((String) str);
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
+ amqpToJmsHeaderNameMappings.put(mapping[0], mapping[1]);
+ jmsToAmqpHeaderNameMappings.put(mapping[1], mapping[0]);
}
- return b;
+ AMQP_TO_JMS_HEADER_NAME_MAPPINGS = Collections.unmodifiableMap(amqpToJmsHeaderNameMappings);
+ JMS_TO_AMQP_HEADER_NAME_MAPPINGS = Collections.unmodifiableMap(jmsToAmqpHeaderNameMappings);
}
- public boolean getBoolean(AMQShortString string) throws JMSException
+ private static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms");
+
+
+ private final FieldTable _headers;
+
+ public JMSHeaderAdapter(FieldTable headers)
{
- checkPropertyName(string);
- Boolean b = getHeaders().getBoolean(string);
+ _headers = headers;
+ }
+
+
+ private String mapJmsToAmqpName(String name)
+ {
+ return JMS_TO_AMQP_HEADER_NAME_MAPPINGS.containsKey(name) ? JMS_TO_AMQP_HEADER_NAME_MAPPINGS.get(name) : name;
+ }
+
+ public boolean getBoolean(String name) throws JMSException
+ {
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ Boolean b = _headers.getBoolean(amqpName);
if (b == null)
{
- if (getHeaders().containsKey(string))
+ if (_headers.containsKey(amqpName))
{
- Object str = getHeaders().getObject(string);
+ Object str = _headers.getObject(amqpName);
if (!(str instanceof String))
{
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ throw new MessageFormatException("getBoolean can't use " + name + " item.");
}
else
{
@@ -106,42 +105,16 @@ public final class JMSHeaderAdapter
return b;
}
- public char getCharacter(String string) throws JMSException
- {
- checkPropertyName(string);
- Character c = getHeaders().getCharacter(string);
- if (c == null)
- {
- if (getHeaders().isNullStringValue(string))
- {
- throw new NullPointerException("Cannot convert null char");
- }
- else
- {
- throw new MessageFormatException("getChar can't use " + string + " item.");
- }
- }
- else
- {
- return (char) c;
- }
- }
-
- public byte[] getBytes(String string) throws JMSException
+ public byte[] getBytes(String name) throws JMSException
{
- return getBytes(new AMQShortString(string));
- }
-
- public byte[] getBytes(AMQShortString string) throws JMSException
- {
- checkPropertyName(string);
-
- byte[] bs = getHeaders().getBytes(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ byte[] bs = _headers.getBytes(amqpName);
if (bs == null)
{
- throw new MessageFormatException("getBytes can't use " + string + " item.");
+ throw new MessageFormatException("getBytes can't use " + name + " item.");
}
else
{
@@ -149,19 +122,20 @@ public final class JMSHeaderAdapter
}
}
- public byte getByte(String string) throws JMSException
+ public byte getByte(String name) throws JMSException
{
- checkPropertyName(string);
- Byte b = getHeaders().getByte(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ Byte b = _headers.getByte(amqpName);
if (b == null)
{
- if (getHeaders().containsKey(string))
+ if (_headers.containsKey(amqpName))
{
- Object str = getHeaders().getObject(string);
+ Object str = _headers.getObject(amqpName);
if (!(str instanceof String))
{
- throw new MessageFormatException("getByte can't use " + string + " item.");
+ throw new MessageFormatException("getByte can't use " + name + " item.");
}
else
{
@@ -177,59 +151,63 @@ public final class JMSHeaderAdapter
return b;
}
- public short getShort(String string) throws JMSException
+ public short getShort(String name) throws JMSException
{
- checkPropertyName(string);
- Short s = getHeaders().getShort(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ Short s = _headers.getShort(amqpName);
if (s == null)
{
- s = Short.valueOf(getByte(string));
+ s = Short.valueOf(getByte(amqpName));
}
return s;
}
- public int getInteger(String string) throws JMSException
+ public int getInteger(String name) throws JMSException
{
- checkPropertyName(string);
- Integer i = getHeaders().getInteger(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ Integer i = _headers.getInteger(amqpName);
if (i == null)
{
- i = Integer.valueOf(getShort(string));
+ i = Integer.valueOf(getShort(amqpName));
}
return i;
}
- public long getLong(String string) throws JMSException
+ public long getLong(String name) throws JMSException
{
- checkPropertyName(string);
- Long l = getHeaders().getLong(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ Long l = _headers.getLong(amqpName);
if (l == null)
{
- l = Long.valueOf(getInteger(string));
+ l = Long.valueOf(getInteger(amqpName));
}
return l;
}
- public float getFloat(String string) throws JMSException
+ public float getFloat(String name) throws JMSException
{
- checkPropertyName(string);
- Float f = getHeaders().getFloat(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ Float f = _headers.getFloat(amqpName);
if (f == null)
{
- if (getHeaders().containsKey(string))
+ if (_headers.containsKey(amqpName))
{
- Object str = getHeaders().getObject(string);
+ Object str = _headers.getObject(amqpName);
if (!(str instanceof String))
{
- throw new MessageFormatException("getFloat can't use " + string + " item.");
+ throw new MessageFormatException("getFloat can't use " + name + " item.");
}
else
{
@@ -238,7 +216,7 @@ public final class JMSHeaderAdapter
}
else
{
- throw new NullPointerException("No such property: " + string);
+ throw new NullPointerException("No such property: " + name);
}
}
@@ -246,32 +224,34 @@ public final class JMSHeaderAdapter
return f;
}
- public double getDouble(String string) throws JMSException
+ public double getDouble(String name) throws JMSException
{
- checkPropertyName(string);
- Double d = getHeaders().getDouble(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ Double d = _headers.getDouble(amqpName);
if (d == null)
{
- d = Double.valueOf(getFloat(string));
+ d = Double.valueOf(getFloat(amqpName));
}
return d;
}
- public String getString(String string) throws JMSException
+ public String getString(String name) throws JMSException
{
- checkPropertyName(string);
- String s = getHeaders().getString(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ String s = _headers.getString(amqpName);
if (s == null)
{
- if (getHeaders().containsKey(string))
+ if (_headers.containsKey(amqpName))
{
- Object o = getHeaders().getObject(string);
+ Object o = _headers.getObject(amqpName);
if (o instanceof byte[])
{
- throw new MessageFormatException("getObject couldn't find " + string + " item.");
+ throw new MessageFormatException("getObject couldn't find " + name + " item.");
}
else
{
@@ -290,115 +270,76 @@ public final class JMSHeaderAdapter
return s;
}
- public Object getObject(String string) throws JMSException
- {
- checkPropertyName(string);
- return getHeaders().getObject(string);
- }
-
- public void setBoolean(AMQShortString string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setBoolean(string, b);
- }
-
- public void setBoolean(String string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setBoolean(string, b);
- }
-
- public void setChar(String string, char c) throws JMSException
- {
- checkPropertyName(string);
- getHeaders().setChar(string, c);
- }
-
- public Object setBytes(AMQShortString string, byte[] bytes)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes);
- }
-
- public Object setBytes(String string, byte[] bytes)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes);
- }
-
- public Object setBytes(String string, byte[] bytes, int start, int length)
- {
- checkPropertyName(string);
- return getHeaders().setBytes(string, bytes, start, length);
- }
-
- public void setByte(String string, byte b) throws JMSException
+ public Object getObject(String name) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setByte(string, b);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ return _headers.getObject(amqpName);
}
- public void setByte(AMQShortString string, byte b) throws JMSException
+ public void setBoolean(String name, boolean b) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setByte(string, b);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setBoolean(amqpName, b);
}
-
- public void setShort(String string, short i) throws JMSException
+ public void setByte(String name, byte b) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setShort(string, i);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setByte(amqpName, b);
}
- public void setInteger(String string, int i) throws JMSException
+ public void setShort(String name, short i) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setInteger(string, i);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setShort(amqpName, i);
}
- public void setInteger(AMQShortString string, int i) throws JMSException
+ public void setInteger(String name, int i) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setInteger(string, i);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setInteger(amqpName, i);
}
- public void setLong(String string, long l) throws JMSException
+ public void setLong(String name, long l) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setLong(string, l);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setLong(amqpName, l);
}
- public void setFloat(String string, float v) throws JMSException
+ public void setFloat(String name, float v) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setFloat(string, v);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setFloat(amqpName, v);
}
- public void setDouble(String string, double v) throws JMSException
+ public void setDouble(String name, double v) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setDouble(string, v);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setDouble(amqpName, v);
}
- public void setString(String string, String string1) throws JMSException
+ public void setString(String name, String value) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setString(string, string1);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
+ _headers.setString(amqpName, value);
}
- public void setString(AMQShortString string, String string1) throws JMSException
+ public void setObject(String name, Object object) throws JMSException
{
- checkPropertyName(string);
- getHeaders().setString(string, string1);
- }
-
- public void setObject(String string, Object object) throws JMSException
- {
- checkPropertyName(string);
+ checkPropertyName(name);
+ String amqpName = mapJmsToAmqpName(name);
try
{
- getHeaders().setObject(string, object);
+ _headers.setObject(amqpName, object);
}
catch (AMQPInvalidClassException aice)
{
@@ -409,85 +350,45 @@ public final class JMSHeaderAdapter
}
}
- public boolean itemExists(String string) throws JMSException
- {
- checkPropertyName(string);
- return getHeaders().containsKey(string);
- }
-
- public Enumeration getPropertyNames()
+ public Set<String> getPropertyNames()
{
- return getHeaders().getPropertyNames();
+ Set<String> names = new LinkedHashSet<>(_headers.keys());
+ for(Map.Entry<String,String> entry : AMQP_TO_JMS_HEADER_NAME_MAPPINGS.entrySet())
+ {
+ if(names.contains(entry.getKey()))
+ {
+ names.add(entry.getValue());
+ if(STRICT_JMS)
+ {
+ names.remove(entry.getKey());
+ }
+ }
+ }
+ return names;
}
public void clear()
{
- getHeaders().clear();
- }
-
- public boolean propertyExists(AMQShortString propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().propertyExists(propertyName);
+ _headers.clear();
}
- public boolean propertyExists(String propertyName)
+ public boolean propertyExists(String name)
{
- checkPropertyName(propertyName);
- return getHeaders().propertyExists(propertyName);
+ checkPropertyName(name);
+ String propertyName = mapJmsToAmqpName(name);
+ return _headers.propertyExists(propertyName);
}
- public Object put(Object key, Object value)
+ public Object remove(String name)
{
- checkPropertyName(key.toString());
- return getHeaders().setObject(key.toString(), value);
- }
-
- public Object remove(AMQShortString propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().remove(propertyName);
- }
-
- public Object remove(String propertyName)
- {
- checkPropertyName(propertyName);
- return getHeaders().remove(propertyName);
+ checkPropertyName(name);
+ String propertyName = mapJmsToAmqpName(name);
+ return _headers.remove(propertyName);
}
public boolean isEmpty()
{
- return getHeaders().isEmpty();
- }
-
- public void writeToBuffer(final ByteBuffer data)
- {
- try
- {
- getHeaders().writeToBuffer(new DataOutputStream(new OutputStream()
- {
- @Override
- public void write(final int b)
- {
- data.put((byte)b);
- }
-
- @Override
- public void write(final byte[] b, final int off, final int len)
- {
- data.put(b, off, len);
- }
- }));
- }
- catch (IOException e)
- {
- throw new IllegalArgumentException("Unexpected IO Exception - should never happen", e);
- }
- }
-
- public Enumeration getMapNames()
- {
- return getPropertyNames();
+ return _headers.isEmpty();
}
protected void checkPropertyName(CharSequence propertyName)
@@ -501,10 +402,10 @@ public final class JMSHeaderAdapter
throw new IllegalArgumentException("Property name must not be the empty string");
}
- checkIdentiferFormat(propertyName);
+ checkIdentifierFormat(propertyName);
}
- protected void checkIdentiferFormat(CharSequence propertyName)
+ protected void checkIdentifierFormat(CharSequence propertyName)
{
// JMS requirements 3.5.1 Property Names
// Identifiers:
@@ -536,7 +437,7 @@ public final class JMSHeaderAdapter
// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
// null and if so are treated as a NULL value.
- if (Boolean.getBoolean("strict-jms"))
+ if (STRICT_JMS)
{
// JMS start character
if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
index 70c6aa4c75..b073275421 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.message;
+import java.util.List;
+
+import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
@@ -29,16 +33,18 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
-import javax.jms.JMSException;
-import java.util.List;
-
public interface MessageFactory
{
- AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ AbstractJMSMessage createMessage(long deliveryTag,
+ boolean redelivered,
ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ List bodies,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType)
throws JMSException, AMQException;
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 7e1ce20238..de46887895 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.message;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +40,6 @@ import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
-import javax.jms.JMSException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class MessageFactoryRegistry
{
/**
@@ -95,19 +96,24 @@ public class MessageFactoryRegistry
* Create a message. This looks up the MIME type from the content header and instantiates the appropriate
* concrete message type.
*
- *
- * @param deliveryTag the AMQ message id
+ * @param deliveryTag the AMQ message id
* @param redelivered true if redelivered
* @param contentHeader the content header that was received
* @param bodies a list of ContentBody instances @return the message.
* @param queueDestinationCache
- *@param topicDestinationCache @throws AMQException
+ * @param topicDestinationCache @throws AMQException
+ * @param addressType
* @throws JMSException
*/
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies,
+ public AbstractJMSMessage createMessage(long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ ContentHeaderBody contentHeader,
+ List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ final int addressType)
throws AMQException, JMSException
{
BasicContentHeaderProperties properties = contentHeader.getProperties();
@@ -124,7 +130,7 @@ public class MessageFactoryRegistry
mf = _default;
}
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
}
public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
index 21bb206349..254980d554 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
@@ -30,7 +30,8 @@ public class QpidMessageProperties
}
public static final String QPID_SUBJECT = "qpid.subject";
- public static final String QPID_SUBJECT_JMS_PROPER = "qpid_subject";
+ public static final String QPID_SUBJECT_JMS_PROPERTY = "JMS_qpid_subject";
+ public static final String QPID_SUBJECT_JMS_PROPER = QPID_SUBJECT_JMS_PROPERTY.substring(4);
// AMQP 0-10 related properties
public static final String AMQP_0_10_APP_ID = "x-amqp-0-10.app-id";
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 99154e820f..d79473d72c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -158,7 +158,7 @@ public class AddressHelper
}
}
- public int getNodeType() throws Exception
+ public int getNodeType()
{
if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
{
@@ -175,7 +175,7 @@ public class AddressHelper
}
else
{
- throw new Exception("unkown exchange type");
+ throw new IllegalArgumentException("unknown exchange type");
}
}
@@ -211,7 +211,7 @@ public class AddressHelper
return (result == null) ? defaultValue : result.booleanValue();
}
- public Link getLink() throws Exception
+ public Link getLink()
{
Link link = new Link();
link.setSubscription(new Subscription());
@@ -234,7 +234,7 @@ public class AddressHelper
}
else
{
- throw new Exception("The reliability mode '" +
+ throw new IllegalArgumentException("The reliability mode '" +
reliability + "' is not yet supported");
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 8c23ddad5e..a4c3d20042 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -255,7 +255,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
try
{
- return AMQDestination.createDestination(str);
+ return AMQDestination.createDestination(str, false);
}
catch (Exception e)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java
index 68f678c1b8..2edd38ea55 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java
@@ -124,7 +124,7 @@ public class AMQMessageDelegate_0_10Test extends QpidTestCase
for (Enumeration props = delegate.getPropertyNames(); props.hasMoreElements();)
{
String key = (String)props.nextElement();
- if (key.equals("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER))
+ if (key.equals(QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY))
{
propFound = true;
}
diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
index 8775362eb5..edb82796e7 100644
--- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
+++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
@@ -305,7 +305,7 @@ public abstract class QpidExceptionHandler implements ExceptionListener
}
else
{
- _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination());
+ _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination(), false);
if (destinationTypeString != null && !destinationTypeString.trim().equals(""))
{
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 391498194b..f4b86c7efd 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -1212,7 +1212,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
private void replyToTest(String replyTo) throws Exception
{
Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination replyToDest = AMQDestination.createDestination(replyTo);
+ Destination replyToDest = AMQDestination.createDestination(replyTo, false);
MessageConsumer replyToCons = session.createConsumer(replyToDest);
Destination dest = session.createQueue("ADDR:amq.direct/test");
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
index 097b021b3e..7ceef47573 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
@@ -30,7 +30,6 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.messaging.Address;
@@ -107,7 +106,7 @@ public class MercuryBase
controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = createDestination();
- controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR);
+ controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR, false);
myControlQueue = session.createQueue(myControlQueueAddr);
msgType = MessageType.getType(config.getMessageType());
_logger.debug("Using " + msgType + " messages");
@@ -122,7 +121,7 @@ public class MercuryBase
{
_logger.debug("Prefix : " + prefix);
Address addr = Address.parse(config.getAddress());
- AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress());
+ AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress(), false);
int type = ((AMQSession_0_10)session).resolveAddressType(temp);
if ( type == AMQDestination.TOPIC_TYPE)
@@ -136,11 +135,11 @@ public class MercuryBase
System.out.println("Setting name : " + addr);
}
- return AMQDestination.createDestination(addr.toString());
+ return AMQDestination.createDestination(addr.toString(), false);
}
else
{
- return AMQDestination.createDestination(config.getAddress());
+ return AMQDestination.createDestination(config.getAddress(), false);
}
}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
index 6dd8b7e1ca..4092f0d59d 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
@@ -34,10 +34,8 @@ import javax.jms.TextMessage;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.tools.TestConfiguration.MessageType;
import org.apache.qpid.tools.report.BasicReporter;
import org.apache.qpid.tools.report.Reporter;
-import org.apache.qpid.tools.report.Statistics.Throughput;
import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,7 +105,7 @@ public class QpidReceive implements MessageListener
if (config.getReadyAddress() != null)
{
MessageProducer prod = session.createProducer(AMQDestination
- .createDestination(config.getReadyAddress()));
+ .createDestination(config.getReadyAddress(), false));
prod.send(session.createMessage());
if (_logger.isDebugEnabled())
{
@@ -193,7 +191,7 @@ public class QpidReceive implements MessageListener
System.out,
config.reportEvery(),
config.isReportHeader());
- Destination dest = AMQDestination.createDestination(config.getAddress());
+ Destination dest = AMQDestination.createDestination(config.getAddress(), false);
QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
receiver.setUp();
receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS());
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
index 3d321dcade..58a643726c 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
@@ -286,7 +286,7 @@ public class QpidSend
config.reportEvery(),
config.isReportHeader()
);
- Destination dest = AMQDestination.createDestination(config.getAddress());
+ Destination dest = AMQDestination.createDestination(config.getAddress(), false);
QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
sender.setUp();
sender.send();