From e514c426b243b53ede124fbe0ee8077be26f3b5b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 2 Sep 2014 16:05:58 +0000 Subject: Merged QPID-6052 (r1621143 r1621148 r1621149 r1621150 r1621826) to 0.30 from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1622044 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQAnyDestination.java | 2 +- .../org/apache/qpid/client/AMQDestination.java | 26 +- .../main/java/org/apache/qpid/client/AMQQueue.java | 13 +- .../java/org/apache/qpid/client/AMQSession.java | 10 +- .../org/apache/qpid/client/AMQSession_0_8.java | 3 +- .../main/java/org/apache/qpid/client/AMQTopic.java | 14 +- .../apache/qpid/client/BasicMessageConsumer.java | 13 +- .../qpid/client/BasicMessageConsumer_0_8.java | 4 +- .../apache/qpid/client/TopicSubscriberAdaptor.java | 5 +- .../client/message/AMQMessageDelegate_0_10.java | 52 +-- .../client/message/AMQMessageDelegate_0_8.java | 83 +++-- .../client/message/AbstractAMQMessageDelegate.java | 86 ++++- .../client/message/AbstractJMSMessageFactory.java | 18 +- .../qpid/client/message/JMSHeaderAdapter.java | 415 ++++++++------------- .../apache/qpid/client/message/MessageFactory.java | 18 +- .../client/message/MessageFactoryRegistry.java | 30 +- .../qpid/client/message/QpidMessageProperties.java | 3 +- .../client/messaging/address/AddressHelper.java | 8 +- .../jndi/PropertiesFileInitialContextFactory.java | 2 +- .../message/AMQMessageDelegate_0_10Test.java | 2 +- .../qpid/ra/inflow/QpidExceptionHandler.java | 2 +- .../destination/AddressBasedDestinationTest.java | 2 +- .../java/org/apache/qpid/tools/MercuryBase.java | 9 +- .../java/org/apache/qpid/tools/QpidReceive.java | 6 +- .../main/java/org/apache/qpid/tools/QpidSend.java | 2 +- 25 files changed, 421 insertions(+), 407 deletions(-) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index 0e1658d2b3..57f8343874 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -58,7 +58,7 @@ public class AMQAnyDestination extends AMQDestination implements Queue, Topic super(str); } - public AMQAnyDestination(Address addr) throws Exception + public AMQAnyDestination(Address addr) { super(addr); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 2714caf2a1..a71555480f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -211,7 +211,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte { } - protected AMQDestination(Address address) throws Exception + protected AMQDestination(Address address) { this._address = address; getInfoFromAddress(); @@ -749,7 +749,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte } } - public static Destination createDestination(String str) throws Exception + public static Destination createDestination(String str, final boolean useNodeTypeForDestinationType) + throws URISyntaxException { DestSyntax syntax = getDestType(str); str = stripSyntaxPrefix(str); @@ -760,7 +761,24 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte else { Address address = createAddressFromString(str); - return new AMQAnyDestination(address); + if(useNodeTypeForDestinationType) + { + AddressHelper helper = new AddressHelper(address); + switch(helper.getNodeType()) + { + case AMQDestination.QUEUE_TYPE: + return new AMQQueue(address); + case AMQDestination.TOPIC_TYPE: + return new AMQTopic(address); + default: + return new AMQAnyDestination(address); + } + + } + else + { + return new AMQAnyDestination(address); + } } } @@ -912,7 +930,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte return Address.parse(str); } - private void getInfoFromAddress() throws Exception + private void getInfoFromAddress() { _name = _address.getName(); _subject = _address.getSubject(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index f65ecece2b..9b00883dfb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -20,13 +20,15 @@ */ package org.apache.qpid.client; +import java.net.URISyntaxException; + +import javax.jms.Queue; + import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; import org.apache.qpid.url.BindingURL; -import javax.jms.Queue; -import java.net.URISyntaxException; - public class AMQQueue extends AMQDestination implements Queue { private static final long serialVersionUID = -1283142598932655606L; @@ -36,6 +38,11 @@ public class AMQQueue extends AMQDestination implements Queue super(); } + public AMQQueue(Address address) + { + super(address); + } + public AMQQueue(String address) throws URISyntaxException { super(address); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0183c30276..3562a10f27 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1423,7 +1423,7 @@ public abstract class AMQSession extends Closeable implements Messa private List _closedStack = null; private boolean _isDurableSubscriber = false; + private int _addressType = AMQDestination.UNKNOWN_TYPE; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -203,7 +204,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } _arguments = ft; - + _addressType = _destination.getAddressType(); } public AMQDestination getDestination() @@ -1066,4 +1067,14 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { _isDurableSubscriber = true; } + + void setAddressType(final int addressType) + { + _addressType = addressType; + } + + int getAddressType() + { + return _addressType; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cdffc73932..459030a10d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -151,7 +151,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer implements TopicSub _topic = topic; _consumer = consumer; _noLocal = noLocal; + consumer.setAddressType(AMQDestination.TOPIC_TYPE); } TopicSubscriberAdaptor(Topic topic, C consumer) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index bd089eb6a8..fa0a6be91c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -129,11 +129,11 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate if (subject != null) { messageProps.getApplicationHeaders().remove(QpidMessageProperties.QPID_SUBJECT); - messageProps.getApplicationHeaders().put("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER,subject); + messageProps.getApplicationHeaders().put(QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY,subject); } } dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(), - _deliveryProps.getRoutingKey(), subject); + _deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE); } setJMSDestination(dest); @@ -280,7 +280,8 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } else { - dest = convertToAddressBasedDestination(exchange,routingKey,null); + dest = convertToAddressBasedDestination(exchange,routingKey,null, false, + AMQDestination.UNKNOWN_TYPE); } _destinationCache.put(replyTo, dest); } @@ -288,49 +289,6 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate return dest; } } - - private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject) - { - String addr; - boolean isQueue = true; - if ("".equals(exchange)) // type Queue - { - subject = (subject == null) ? "" : "/" + subject; - addr = routingKey + subject; - } - else - { - addr = exchange + "/" + routingKey; - isQueue = false; - } - - try - { - AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr); - if (isQueue) - { - dest.setQueueName(new AMQShortString(routingKey)); - dest.setRoutingKey(new AMQShortString(routingKey)); - dest.setExchangeName(new AMQShortString("")); - } - else - { - dest.setRoutingKey(new AMQShortString(routingKey)); - dest.setExchangeName(new AMQShortString(exchange)); - } - return dest; - } - catch(Exception e) - { - // An exception is only thrown here if the address syntax is invalid. - // Logging the exception, but not throwing as this is only important to Qpid developers. - // An exception here means a bug in the code. - _logger.error("Exception when constructing an address string from the ReplyTo struct"); - - // falling back to the old way of doing it to ensure the application continues. - return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); - } - } public void setJMSReplyTo(Destination destination) throws JMSException { @@ -747,7 +705,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } else if (isStrictJMS && QpidMessageProperties.QPID_SUBJECT.equals(propertyName)) { - return (String)getApplicationHeaders().get("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER); + return (String)getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index d9d2cf85d3..486023e847 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -21,6 +21,19 @@ package org.apache.qpid.client.message; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import javax.jms.Queue; + import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -32,17 +45,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; -import javax.jms.Queue; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; - public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { @@ -63,6 +65,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate }); public static final String JMS_TYPE = "x-jms-type"; + public static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms"); private boolean _readableProperties = false; @@ -96,7 +99,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate // Used when generating a received message object protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey, AMQSession_0_8.DestinationCache queueDestinationCache, - AMQSession_0_8.DestinationCache topicDestinationCache) + AMQSession_0_8.DestinationCache topicDestinationCache, + int addressType) { this(contentHeader, deliveryTag); @@ -104,28 +108,46 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate AMQDestination dest = null; - // If we have a type set the attempt to use that. - if (type != null) + if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL) { - switch (type.intValue()) + // If we have a type set the attempt to use that. + if (type != null) { - case AMQDestination.QUEUE_TYPE: - dest = queueDestinationCache.getDestination(exchange, routingKey); - break; - case AMQDestination.TOPIC_TYPE: - dest = topicDestinationCache.getDestination(exchange, routingKey); - break; - default: - // Use the generateDestination method - dest = null; + switch (type.intValue()) + { + case AMQDestination.QUEUE_TYPE: + dest = queueDestinationCache.getDestination(exchange, routingKey); + break; + case AMQDestination.TOPIC_TYPE: + dest = topicDestinationCache.getDestination(exchange, routingKey); + break; + default: + // Use the generateDestination method + dest = null; + } } - } - if (dest == null) + if (dest == null) + { + dest = generateDestination(exchange, routingKey); + } + } + else { - dest = generateDestination(exchange, routingKey); + String subject = null; + if (contentHeader.getHeaders() != null + && contentHeader.getHeaders().containsKey(QpidMessageProperties.QPID_SUBJECT)) + { + subject = contentHeader.getHeaders().getString(QpidMessageProperties.QPID_SUBJECT); + } + if(type == null) + { + type = addressType; + } + dest = (AMQDestination) convertToAddressBasedDestination(AMQShortString.toString(exchange), + AMQShortString.toString(routingKey), subject, + true, type); } - setJMSDestination(dest); } @@ -484,7 +506,8 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate public Enumeration getPropertyNames() throws JMSException { - return getJmsHeaders().getPropertyNames(); + Set keys = getJmsHeaders().getPropertyNames(); + return Collections.enumeration(keys); } public void setBooleanProperty(String propertyName, boolean b) throws JMSException @@ -556,7 +579,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } checkWritableProperties(); - getJmsHeaders().setDouble(propertyName, new Double(v)); + getJmsHeaders().setDouble(propertyName, v); } public void setStringProperty(String propertyName, String value) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 784c33cf02..d1ca6adb60 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -20,6 +20,16 @@ */ package org.apache.qpid.client.message; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import javax.jms.JMSException; -import javax.jms.Session; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * This abstract class provides exchange lookup functionality that is shared * between all MessageDelegates. Update facilities are provided so that the 0-10 @@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { + private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class); private static Map _exchangeTypeToDestinationType = new ConcurrentHashMap(); private static Map _exchangeMap = new ConcurrentHashMap(); @@ -222,6 +228,76 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate { return _session; } + + protected Destination convertToAddressBasedDestination(String exchange, + String routingKey, + String subject, + boolean useNodeTypeForDestinationType, + int type) + { + String addr; + boolean isQueue = true; + if ("".equals(exchange)) // type Queue + { + subject = (subject == null) ? "" : "/" + subject; + addr = routingKey + subject; + + } + else + { + addr = exchange + "/" + routingKey; + isQueue = false; + } + + if(useNodeTypeForDestinationType) + { + if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange)) + { + type = AMQDestination.QUEUE_TYPE; + } + + switch(type) + { + case AMQDestination.QUEUE_TYPE: + addr = addr + " ; { node: { type: queue } } "; + break; + case AMQDestination.TOPIC_TYPE: + addr = addr + " ; { node: { type: topic } } "; + break; + default: + // do nothing + } + } + + + try + { + AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr, + useNodeTypeForDestinationType); + if (isQueue) + { + dest.setQueueName(new AMQShortString(routingKey)); + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString("")); + } + else + { + dest.setRoutingKey(new AMQShortString(routingKey)); + dest.setExchangeName(new AMQShortString(exchange)); + } + return dest; + } + catch(Exception e) + { + // An exception is only thrown here if the address syntax is invalid. + // Logging the exception, but not throwing as this is only important to Qpid developers. + // An exception here means a bug in the code. + _logger.error("Exception when constructing an address string from the ReplyTo struct"); + + // falling back to the old way of doing it to ensure the application continues. + return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + } + } } class ExchangeInfo diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 9748038b9b..65f4478baa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -47,11 +47,14 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class); - protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, + protected AbstractJMSMessage create08MessageWithBody(long messageNbr, + ContentHeaderBody contentHeader, + AMQShortString exchange, + AMQShortString routingKey, List bodies, AMQSession_0_8.DestinationCache queueDestinationCache, - AMQSession_0_8.DestinationCache topicDestinationCache) throws AMQException + AMQSession_0_8.DestinationCache topicDestinationCache, + final int addressType) throws AMQException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); @@ -117,7 +120,8 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr, contentHeader.getProperties(), - exchange, routingKey, queueDestinationCache, topicDestinationCache); + exchange, routingKey, queueDestinationCache, + topicDestinationCache, addressType); return createMessage(delegate, data); } @@ -162,13 +166,15 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory return message; } + @Override public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, List bodies, AMQSession_0_8.DestinationCache queueDestinationCache, - AMQSession_0_8.DestinationCache topicDestinationCache) + AMQSession_0_8.DestinationCache topicDestinationCache, + int addressType) throws JMSException, AMQException { - final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache); + final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType); msg.setJMSRedelivered(redelivered); msg.setReceivedFromServer(); return msg; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 122a5c4ef2..e27b3d81cb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -20,77 +20,76 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Enumeration; + +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.FieldTable; public final class JMSHeaderAdapter { - private final FieldTable _headers; + private static final Map AMQP_TO_JMS_HEADER_NAME_MAPPINGS; + private static final Map JMS_TO_AMQP_HEADER_NAME_MAPPINGS; - public JMSHeaderAdapter(FieldTable headers) - { - _headers = headers; - } - public FieldTable getHeaders() + static { - return _headers; - } + String[][] mappings = { + { QpidMessageProperties.QPID_SUBJECT, QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY } + }; - public boolean getBoolean(String string) throws JMSException - { - checkPropertyName(string); - Boolean b = getHeaders().getBoolean(string); + Map amqpToJmsHeaderNameMappings = new HashMap<>(); + Map jmsToAmqpHeaderNameMappings = new HashMap<>(); - if (b == null) + for(String[] mapping : mappings) { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (!(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } + amqpToJmsHeaderNameMappings.put(mapping[0], mapping[1]); + jmsToAmqpHeaderNameMappings.put(mapping[1], mapping[0]); } - return b; + AMQP_TO_JMS_HEADER_NAME_MAPPINGS = Collections.unmodifiableMap(amqpToJmsHeaderNameMappings); + JMS_TO_AMQP_HEADER_NAME_MAPPINGS = Collections.unmodifiableMap(jmsToAmqpHeaderNameMappings); } - public boolean getBoolean(AMQShortString string) throws JMSException + private static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms"); + + + private final FieldTable _headers; + + public JMSHeaderAdapter(FieldTable headers) { - checkPropertyName(string); - Boolean b = getHeaders().getBoolean(string); + _headers = headers; + } + + + private String mapJmsToAmqpName(String name) + { + return JMS_TO_AMQP_HEADER_NAME_MAPPINGS.containsKey(name) ? JMS_TO_AMQP_HEADER_NAME_MAPPINGS.get(name) : name; + } + + public boolean getBoolean(String name) throws JMSException + { + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + Boolean b = _headers.getBoolean(amqpName); if (b == null) { - if (getHeaders().containsKey(string)) + if (_headers.containsKey(amqpName)) { - Object str = getHeaders().getObject(string); + Object str = _headers.getObject(amqpName); if (!(str instanceof String)) { - throw new MessageFormatException("getBoolean can't use " + string + " item."); + throw new MessageFormatException("getBoolean can't use " + name + " item."); } else { @@ -106,42 +105,16 @@ public final class JMSHeaderAdapter return b; } - public char getCharacter(String string) throws JMSException - { - checkPropertyName(string); - Character c = getHeaders().getCharacter(string); - if (c == null) - { - if (getHeaders().isNullStringValue(string)) - { - throw new NullPointerException("Cannot convert null char"); - } - else - { - throw new MessageFormatException("getChar can't use " + string + " item."); - } - } - else - { - return (char) c; - } - } - - public byte[] getBytes(String string) throws JMSException + public byte[] getBytes(String name) throws JMSException { - return getBytes(new AMQShortString(string)); - } - - public byte[] getBytes(AMQShortString string) throws JMSException - { - checkPropertyName(string); - - byte[] bs = getHeaders().getBytes(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + byte[] bs = _headers.getBytes(amqpName); if (bs == null) { - throw new MessageFormatException("getBytes can't use " + string + " item."); + throw new MessageFormatException("getBytes can't use " + name + " item."); } else { @@ -149,19 +122,20 @@ public final class JMSHeaderAdapter } } - public byte getByte(String string) throws JMSException + public byte getByte(String name) throws JMSException { - checkPropertyName(string); - Byte b = getHeaders().getByte(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + Byte b = _headers.getByte(amqpName); if (b == null) { - if (getHeaders().containsKey(string)) + if (_headers.containsKey(amqpName)) { - Object str = getHeaders().getObject(string); + Object str = _headers.getObject(amqpName); if (!(str instanceof String)) { - throw new MessageFormatException("getByte can't use " + string + " item."); + throw new MessageFormatException("getByte can't use " + name + " item."); } else { @@ -177,59 +151,63 @@ public final class JMSHeaderAdapter return b; } - public short getShort(String string) throws JMSException + public short getShort(String name) throws JMSException { - checkPropertyName(string); - Short s = getHeaders().getShort(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + Short s = _headers.getShort(amqpName); if (s == null) { - s = Short.valueOf(getByte(string)); + s = Short.valueOf(getByte(amqpName)); } return s; } - public int getInteger(String string) throws JMSException + public int getInteger(String name) throws JMSException { - checkPropertyName(string); - Integer i = getHeaders().getInteger(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + Integer i = _headers.getInteger(amqpName); if (i == null) { - i = Integer.valueOf(getShort(string)); + i = Integer.valueOf(getShort(amqpName)); } return i; } - public long getLong(String string) throws JMSException + public long getLong(String name) throws JMSException { - checkPropertyName(string); - Long l = getHeaders().getLong(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + Long l = _headers.getLong(amqpName); if (l == null) { - l = Long.valueOf(getInteger(string)); + l = Long.valueOf(getInteger(amqpName)); } return l; } - public float getFloat(String string) throws JMSException + public float getFloat(String name) throws JMSException { - checkPropertyName(string); - Float f = getHeaders().getFloat(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + Float f = _headers.getFloat(amqpName); if (f == null) { - if (getHeaders().containsKey(string)) + if (_headers.containsKey(amqpName)) { - Object str = getHeaders().getObject(string); + Object str = _headers.getObject(amqpName); if (!(str instanceof String)) { - throw new MessageFormatException("getFloat can't use " + string + " item."); + throw new MessageFormatException("getFloat can't use " + name + " item."); } else { @@ -238,7 +216,7 @@ public final class JMSHeaderAdapter } else { - throw new NullPointerException("No such property: " + string); + throw new NullPointerException("No such property: " + name); } } @@ -246,32 +224,34 @@ public final class JMSHeaderAdapter return f; } - public double getDouble(String string) throws JMSException + public double getDouble(String name) throws JMSException { - checkPropertyName(string); - Double d = getHeaders().getDouble(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + Double d = _headers.getDouble(amqpName); if (d == null) { - d = Double.valueOf(getFloat(string)); + d = Double.valueOf(getFloat(amqpName)); } return d; } - public String getString(String string) throws JMSException + public String getString(String name) throws JMSException { - checkPropertyName(string); - String s = getHeaders().getString(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + String s = _headers.getString(amqpName); if (s == null) { - if (getHeaders().containsKey(string)) + if (_headers.containsKey(amqpName)) { - Object o = getHeaders().getObject(string); + Object o = _headers.getObject(amqpName); if (o instanceof byte[]) { - throw new MessageFormatException("getObject couldn't find " + string + " item."); + throw new MessageFormatException("getObject couldn't find " + name + " item."); } else { @@ -290,115 +270,76 @@ public final class JMSHeaderAdapter return s; } - public Object getObject(String string) throws JMSException - { - checkPropertyName(string); - return getHeaders().getObject(string); - } - - public void setBoolean(AMQShortString string, boolean b) throws JMSException - { - checkPropertyName(string); - getHeaders().setBoolean(string, b); - } - - public void setBoolean(String string, boolean b) throws JMSException - { - checkPropertyName(string); - getHeaders().setBoolean(string, b); - } - - public void setChar(String string, char c) throws JMSException - { - checkPropertyName(string); - getHeaders().setChar(string, c); - } - - public Object setBytes(AMQShortString string, byte[] bytes) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes); - } - - public Object setBytes(String string, byte[] bytes) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes); - } - - public Object setBytes(String string, byte[] bytes, int start, int length) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes, start, length); - } - - public void setByte(String string, byte b) throws JMSException + public Object getObject(String name) throws JMSException { - checkPropertyName(string); - getHeaders().setByte(string, b); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + return _headers.getObject(amqpName); } - public void setByte(AMQShortString string, byte b) throws JMSException + public void setBoolean(String name, boolean b) throws JMSException { - checkPropertyName(string); - getHeaders().setByte(string, b); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setBoolean(amqpName, b); } - - public void setShort(String string, short i) throws JMSException + public void setByte(String name, byte b) throws JMSException { - checkPropertyName(string); - getHeaders().setShort(string, i); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setByte(amqpName, b); } - public void setInteger(String string, int i) throws JMSException + public void setShort(String name, short i) throws JMSException { - checkPropertyName(string); - getHeaders().setInteger(string, i); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setShort(amqpName, i); } - public void setInteger(AMQShortString string, int i) throws JMSException + public void setInteger(String name, int i) throws JMSException { - checkPropertyName(string); - getHeaders().setInteger(string, i); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setInteger(amqpName, i); } - public void setLong(String string, long l) throws JMSException + public void setLong(String name, long l) throws JMSException { - checkPropertyName(string); - getHeaders().setLong(string, l); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setLong(amqpName, l); } - public void setFloat(String string, float v) throws JMSException + public void setFloat(String name, float v) throws JMSException { - checkPropertyName(string); - getHeaders().setFloat(string, v); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setFloat(amqpName, v); } - public void setDouble(String string, double v) throws JMSException + public void setDouble(String name, double v) throws JMSException { - checkPropertyName(string); - getHeaders().setDouble(string, v); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setDouble(amqpName, v); } - public void setString(String string, String string1) throws JMSException + public void setString(String name, String value) throws JMSException { - checkPropertyName(string); - getHeaders().setString(string, string1); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); + _headers.setString(amqpName, value); } - public void setString(AMQShortString string, String string1) throws JMSException + public void setObject(String name, Object object) throws JMSException { - checkPropertyName(string); - getHeaders().setString(string, string1); - } - - public void setObject(String string, Object object) throws JMSException - { - checkPropertyName(string); + checkPropertyName(name); + String amqpName = mapJmsToAmqpName(name); try { - getHeaders().setObject(string, object); + _headers.setObject(amqpName, object); } catch (AMQPInvalidClassException aice) { @@ -409,85 +350,45 @@ public final class JMSHeaderAdapter } } - public boolean itemExists(String string) throws JMSException - { - checkPropertyName(string); - return getHeaders().containsKey(string); - } - - public Enumeration getPropertyNames() + public Set getPropertyNames() { - return getHeaders().getPropertyNames(); + Set names = new LinkedHashSet<>(_headers.keys()); + for(Map.Entry entry : AMQP_TO_JMS_HEADER_NAME_MAPPINGS.entrySet()) + { + if(names.contains(entry.getKey())) + { + names.add(entry.getValue()); + if(STRICT_JMS) + { + names.remove(entry.getKey()); + } + } + } + return names; } public void clear() { - getHeaders().clear(); - } - - public boolean propertyExists(AMQShortString propertyName) - { - checkPropertyName(propertyName); - return getHeaders().propertyExists(propertyName); + _headers.clear(); } - public boolean propertyExists(String propertyName) + public boolean propertyExists(String name) { - checkPropertyName(propertyName); - return getHeaders().propertyExists(propertyName); + checkPropertyName(name); + String propertyName = mapJmsToAmqpName(name); + return _headers.propertyExists(propertyName); } - public Object put(Object key, Object value) + public Object remove(String name) { - checkPropertyName(key.toString()); - return getHeaders().setObject(key.toString(), value); - } - - public Object remove(AMQShortString propertyName) - { - checkPropertyName(propertyName); - return getHeaders().remove(propertyName); - } - - public Object remove(String propertyName) - { - checkPropertyName(propertyName); - return getHeaders().remove(propertyName); + checkPropertyName(name); + String propertyName = mapJmsToAmqpName(name); + return _headers.remove(propertyName); } public boolean isEmpty() { - return getHeaders().isEmpty(); - } - - public void writeToBuffer(final ByteBuffer data) - { - try - { - getHeaders().writeToBuffer(new DataOutputStream(new OutputStream() - { - @Override - public void write(final int b) - { - data.put((byte)b); - } - - @Override - public void write(final byte[] b, final int off, final int len) - { - data.put(b, off, len); - } - })); - } - catch (IOException e) - { - throw new IllegalArgumentException("Unexpected IO Exception - should never happen", e); - } - } - - public Enumeration getMapNames() - { - return getPropertyNames(); + return _headers.isEmpty(); } protected void checkPropertyName(CharSequence propertyName) @@ -501,10 +402,10 @@ public final class JMSHeaderAdapter throw new IllegalArgumentException("Property name must not be the empty string"); } - checkIdentiferFormat(propertyName); + checkIdentifierFormat(propertyName); } - protected void checkIdentiferFormat(CharSequence propertyName) + protected void checkIdentifierFormat(CharSequence propertyName) { // JMS requirements 3.5.1 Property Names // Identifiers: @@ -536,7 +437,7 @@ public final class JMSHeaderAdapter // JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be // null and if so are treated as a NULL value. - if (Boolean.getBoolean("strict-jms")) + if (STRICT_JMS) { // JMS start character if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java index 70c6aa4c75..b073275421 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.message; +import java.util.List; + +import javax.jms.JMSException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession_0_8; @@ -29,16 +33,18 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; -import javax.jms.JMSException; -import java.util.List; - public interface MessageFactory { - AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, + AbstractJMSMessage createMessage(long deliveryTag, + boolean redelivered, ContentHeaderBody contentHeader, - AMQShortString exchange, AMQShortString routingKey, - List bodies, AMQSession_0_8.DestinationCache queueDestinationCache, AMQSession_0_8.DestinationCache topicDestinationCache) + AMQShortString exchange, + AMQShortString routingKey, + List bodies, + AMQSession_0_8.DestinationCache queueDestinationCache, + AMQSession_0_8.DestinationCache topicDestinationCache, + final int addressType) throws JMSException, AMQException; AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 7e1ce20238..de46887895 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client.message; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.JMSException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,11 +40,6 @@ import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; -import javax.jms.JMSException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class MessageFactoryRegistry { /** @@ -95,19 +96,24 @@ public class MessageFactoryRegistry * Create a message. This looks up the MIME type from the content header and instantiates the appropriate * concrete message type. * - * - * @param deliveryTag the AMQ message id + * @param deliveryTag the AMQ message id * @param redelivered true if redelivered * @param contentHeader the content header that was received * @param bodies a list of ContentBody instances @return the message. * @param queueDestinationCache - *@param topicDestinationCache @throws AMQException + * @param topicDestinationCache @throws AMQException + * @param addressType * @throws JMSException */ - public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange, - AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies, + public AbstractJMSMessage createMessage(long deliveryTag, + boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, + ContentHeaderBody contentHeader, + List bodies, AMQSession_0_8.DestinationCache queueDestinationCache, - AMQSession_0_8.DestinationCache topicDestinationCache) + AMQSession_0_8.DestinationCache topicDestinationCache, + final int addressType) throws AMQException, JMSException { BasicContentHeaderProperties properties = contentHeader.getProperties(); @@ -124,7 +130,7 @@ public class MessageFactoryRegistry mf = _default; } - return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache); + return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType); } public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java index 21bb206349..254980d554 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java @@ -30,7 +30,8 @@ public class QpidMessageProperties } public static final String QPID_SUBJECT = "qpid.subject"; - public static final String QPID_SUBJECT_JMS_PROPER = "qpid_subject"; + public static final String QPID_SUBJECT_JMS_PROPERTY = "JMS_qpid_subject"; + public static final String QPID_SUBJECT_JMS_PROPER = QPID_SUBJECT_JMS_PROPERTY.substring(4); // AMQP 0-10 related properties public static final String AMQP_0_10_APP_ID = "x-amqp-0-10.app-id"; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 99154e820f..d79473d72c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -158,7 +158,7 @@ public class AddressHelper } } - public int getNodeType() throws Exception + public int getNodeType() { if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null) { @@ -175,7 +175,7 @@ public class AddressHelper } else { - throw new Exception("unkown exchange type"); + throw new IllegalArgumentException("unknown exchange type"); } } @@ -211,7 +211,7 @@ public class AddressHelper return (result == null) ? defaultValue : result.booleanValue(); } - public Link getLink() throws Exception + public Link getLink() { Link link = new Link(); link.setSubscription(new Subscription()); @@ -234,7 +234,7 @@ public class AddressHelper } else { - throw new Exception("The reliability mode '" + + throw new IllegalArgumentException("The reliability mode '" + reliability + "' is not yet supported"); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 8c23ddad5e..a4c3d20042 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -255,7 +255,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { try { - return AMQDestination.createDestination(str); + return AMQDestination.createDestination(str, false); } catch (Exception e) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java index 68f678c1b8..2edd38ea55 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java @@ -124,7 +124,7 @@ public class AMQMessageDelegate_0_10Test extends QpidTestCase for (Enumeration props = delegate.getPropertyNames(); props.hasMoreElements();) { String key = (String)props.nextElement(); - if (key.equals("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER)) + if (key.equals(QpidMessageProperties.QPID_SUBJECT_JMS_PROPERTY)) { propFound = true; } diff --git a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java index 8775362eb5..edb82796e7 100644 --- a/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java +++ b/qpid/java/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java @@ -305,7 +305,7 @@ public abstract class QpidExceptionHandler implements ExceptionListener } else { - _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination()); + _destination = (AMQDestination)AMQDestination.createDestination(_spec.getDestination(), false); if (destinationTypeString != null && !destinationTypeString.trim().equals("")) { diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 391498194b..f4b86c7efd 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1212,7 +1212,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase private void replyToTest(String replyTo) throws Exception { Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination replyToDest = AMQDestination.createDestination(replyTo); + Destination replyToDest = AMQDestination.createDestination(replyTo, false); MessageConsumer replyToCons = session.createConsumer(replyToDest); Destination dest = session.createQueue("ADDR:amq.direct/test"); diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java index 097b021b3e..7ceef47573 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java @@ -30,7 +30,6 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; -import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.messaging.Address; @@ -107,7 +106,7 @@ public class MercuryBase controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); dest = createDestination(); - controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR); + controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR, false); myControlQueue = session.createQueue(myControlQueueAddr); msgType = MessageType.getType(config.getMessageType()); _logger.debug("Using " + msgType + " messages"); @@ -122,7 +121,7 @@ public class MercuryBase { _logger.debug("Prefix : " + prefix); Address addr = Address.parse(config.getAddress()); - AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress()); + AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress(), false); int type = ((AMQSession_0_10)session).resolveAddressType(temp); if ( type == AMQDestination.TOPIC_TYPE) @@ -136,11 +135,11 @@ public class MercuryBase System.out.println("Setting name : " + addr); } - return AMQDestination.createDestination(addr.toString()); + return AMQDestination.createDestination(addr.toString(), false); } else { - return AMQDestination.createDestination(config.getAddress()); + return AMQDestination.createDestination(config.getAddress(), false); } } diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java index 6dd8b7e1ca..4092f0d59d 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java @@ -34,10 +34,8 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.tools.TestConfiguration.MessageType; import org.apache.qpid.tools.report.BasicReporter; import org.apache.qpid.tools.report.Reporter; -import org.apache.qpid.tools.report.Statistics.Throughput; import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,7 +105,7 @@ public class QpidReceive implements MessageListener if (config.getReadyAddress() != null) { MessageProducer prod = session.createProducer(AMQDestination - .createDestination(config.getReadyAddress())); + .createDestination(config.getReadyAddress(), false)); prod.send(session.createMessage()); if (_logger.isDebugEnabled()) { @@ -193,7 +191,7 @@ public class QpidReceive implements MessageListener System.out, config.reportEvery(), config.isReportHeader()); - Destination dest = AMQDestination.createDestination(config.getAddress()); + Destination dest = AMQDestination.createDestination(config.getAddress(), false); QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); receiver.setUp(); receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS()); diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java index 3d321dcade..58a643726c 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java @@ -286,7 +286,7 @@ public class QpidSend config.reportEvery(), config.isReportHeader() ); - Destination dest = AMQDestination.createDestination(config.getAddress()); + Destination dest = AMQDestination.createDestination(config.getAddress(), false); QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); sender.setUp(); sender.send(); -- cgit v1.2.1