From d6e512848ca6ac9d25dc86226c13cc086f80afa1 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 7 Dec 2012 08:50:21 +0000 Subject: QPID-4454 : Fixes to allow to/reply-to JMS type to be carried as message annotations in AMQP 1.0 JMS client Merged from trunk r1417368 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1418220 13f79535-47bb-0310-9956-ffa450edef68 --- .../amqp_1_0/jms/impl/ConnectionFactoryImpl.java | 28 +++- .../qpid/amqp_1_0/jms/impl/ConnectionImpl.java | 81 ++++++++- .../qpid/amqp_1_0/jms/impl/DecodedDestination.java | 47 ++++++ .../amqp_1_0/jms/impl/MessageConsumerImpl.java | 2 +- .../apache/qpid/amqp_1_0/jms/impl/MessageImpl.java | 183 +++++++++++++++++---- .../amqp_1_0/jms/impl/MessageProducerImpl.java | 5 +- .../qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java | 2 +- .../qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java | 2 +- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 6 + .../amqp_1_0/jms/impl/TopicSubscriberImpl.java | 2 +- 10 files changed, 316 insertions(+), 42 deletions(-) create mode 100644 qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index 66cfe10771..4856a7c491 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -43,6 +43,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private String _remoteHost; private boolean _ssl; + private String _queuePrefix; + private String _topicPrefix; public ConnectionFactoryImpl(final String host, final int port, @@ -90,12 +92,15 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection public ConnectionImpl createConnection() throws JMSException { - return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl); + return createConnection(_username, _password); } public ConnectionImpl createConnection(final String username, final String password) throws JMSException { - return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + connection.setQueuePrefix(_queuePrefix); + connection.setTopicPrefix(_topicPrefix); + return connection; } public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException @@ -211,4 +216,23 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection return connection; } + public String getTopicPrefix() + { + return _topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) + { + _topicPrefix = topicPrefix; + } + + public String getQueuePrefix() + { + return _queuePrefix; + } + + public void setQueuePrefix(String queuePrefix) + { + _queuePrefix = queuePrefix; + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 417f6f71e1..be1c2d6514 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transport.Container; import javax.jms.*; import javax.jms.IllegalStateException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import javax.jms.Queue; +import java.util.*; public class ConnectionImpl implements Connection, QueueConnection, TopicConnection { @@ -50,6 +49,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private final String _remoteHost; private final boolean _ssl; private String _clientId; + private String _queuePrefix; + private String _topicPrefix; private static enum State @@ -379,4 +380,78 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { _isTopicConnection = topicConnection; } + + public String getTopicPrefix() + { + return _topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) + { + _topicPrefix = topicPrefix; + } + + public String getQueuePrefix() + { + return _queuePrefix; + } + + public void setQueuePrefix(String queueprefix) + { + _queuePrefix = queueprefix; + } + + DecodedDestination toDecodedDestination(DestinationImpl dest) + { + String address = dest.getAddress(); + Set kind = null; + Class clazz = dest.getClass(); + if( clazz==QueueImpl.class ) + { + kind = MessageImpl.JMS_QUEUE_ATTRIBUTES; + if( _queuePrefix!=null ) + { + // Avoid double prefixing.. + if( !address.startsWith(_queuePrefix) ) + { + address = _queuePrefix+address; + } + } + } + else if( clazz==TopicImpl.class ) + { + kind = MessageImpl.JMS_TOPIC_ATTRIBUTES; + if( _topicPrefix!=null ) + { + // Avoid double prefixing.. + if( !address.startsWith(_topicPrefix) ) + { + address = _topicPrefix+address; + } + } + } + else if( clazz==TemporaryQueueImpl.class ) + { + kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES; + } + else if( clazz==TemporaryTopicImpl.class ) + { + kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES; + } + return new DecodedDestination(address, kind); + } + + DecodedDestination toDecodedDestination(String address, Set kind) + { + if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix)) + { + return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES); + } + if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix)) + { + return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES); + } + return new DecodedDestination(address, kind); + } + } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java new file mode 100644 index 0000000000..74e98c2163 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.amqp_1_0.jms.impl; + +import java.util.Set; + +/** +* @author Hiram Chirino +*/ +class DecodedDestination +{ + private final String _address; + private final Set _attributes; + + DecodedDestination(String address, Set kind) + { + _address = address; + _attributes = kind; + } + + public String getAddress() + { + return _address; + } + + public Set getAttributes() + { + return _attributes; + } +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index eb34cead08..3c15c74d6f 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -127,7 +127,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi { try { - return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO, + return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO, _linkName, _durable, getFilters(), null); } catch (AmqpErrorException e) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java index f1056b94fd..fba50c5477 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java @@ -50,14 +50,24 @@ public abstract class MessageImpl implements Message static final Set _supportedClasses = new HashSet(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, Character.class, String.class, byte[].class)); - private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); + static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); + static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type"); + static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type"); + + static final String QUEUE_ATTRIBUTE = "queue"; + static final String TOPIC_ATTRIBUTE = "topic"; + static final String TEMPORARY_ATTRIBUTE = "temporary"; + + static final Set JMS_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE); + static final Set JMS_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE); + static final Set JMS_TEMP_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE, TEMPORARY_ATTRIBUTE); + static final Set JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE); private Header _header; private Properties _properties; private ApplicationProperties _applicationProperties; private Footer _footer; - public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); - private SessionImpl _sessionImpl; + private final SessionImpl _sessionImpl; private boolean _readOnly; private MessageAnnotations _messageAnnotations; @@ -171,45 +181,53 @@ public abstract class MessageImpl implements Message public DestinationImpl getJMSReplyTo() throws JMSException { - return DestinationImpl.valueOf(getReplyTo()); + return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE))); } public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException { - if(destination == null) + if( destination==null ) { setReplyTo(null); - } - else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination) - { - setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress()); + messageAnnotationMap().remove(REPLY_TO_TYPE); } else { - throw new NonAMQPDestinationException(destination); + DecodedDestination dd = toDecodedDestination(destination); + setReplyTo(dd.getAddress()); + messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes())); } } public DestinationImpl getJMSDestination() throws JMSException { - return _isFromQueue ? QueueImpl.valueOf(getTo()) - : _isFromTopic ? TopicImpl.valueOf(getTo()) - : DestinationImpl.valueOf(getTo()); + Set type = splitCommaSeparateSet((String) getMessageAnnotation(TO_TYPE)); + if( type==null ) + { + if( _isFromQueue ) + { + type = JMS_QUEUE_ATTRIBUTES; + } + else if( _isFromTopic ) + { + type = JMS_TOPIC_ATTRIBUTES; + } + } + return toDestination(getTo(), type); } public void setJMSDestination(Destination destination) throws NonAMQPDestinationException { - if(destination == null) + if( destination==null ) { setTo(null); - } - else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination) - { - setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress()); + messageAnnotationMap().remove(TO_TYPE); } else { - throw new NonAMQPDestinationException(destination); + DecodedDestination dd = toDecodedDestination(destination); + setTo(dd.getAddress()); + messageAnnotationMap().put(TO_TYPE, join(",", dd.getAttributes())); } } @@ -264,22 +282,13 @@ public abstract class MessageImpl implements Message public String getJMSType() throws JMSException { - Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); - final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE); - + final Object attrValue = getMessageAnnotation(JMS_TYPE); return attrValue instanceof String ? attrValue.toString() : null; } public void setJMSType(String s) throws JMSException { - Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); - if(messageAttrs == null) - { - messageAttrs = new HashMap(); - _messageAnnotations = new MessageAnnotations(messageAttrs); - } - - messageAttrs.put(JMS_TYPE, s); + messageAnnotationMap().put(JMS_TYPE, s); } public long getJMSExpiration() throws JMSException @@ -1206,4 +1215,118 @@ public abstract class MessageImpl implements Message } abstract Collection
getSections(); + + DecodedDestination toDecodedDestination(Destination destination) throws NonAMQPDestinationException + { + if(destination == null) + { + return null; + } + if (destination instanceof DestinationImpl) + { + return _sessionImpl.getConnection().toDecodedDestination((DestinationImpl) destination); + } + throw new NonAMQPDestinationException(destination); + } + + DestinationImpl toDestination(String address, Set kind) + { + if( address == null ) + { + return null; + } + + // If destination prefixes are in play, we have to strip the the prefix, and we might + // be able to infer the kind, if we don't know it yet. + DecodedDestination decoded = _sessionImpl.getConnection().toDecodedDestination(address, kind); + address = decoded.getAddress(); + kind = decoded.getAttributes(); + + if( kind == null ) + { + return DestinationImpl.valueOf(address); + } + if( kind.contains(QUEUE_ATTRIBUTE) ) + { + if( kind.contains(TEMPORARY_ATTRIBUTE) ) + { + return new TemporaryQueueImpl(address, null, _sessionImpl); + } + else + { + return QueueImpl.valueOf(address); + } + } + else if ( kind.contains(TOPIC_ATTRIBUTE) ) + { + if( kind.contains(TEMPORARY_ATTRIBUTE) ) + { + return new TemporaryTopicImpl(address, null, _sessionImpl); + } + else + { + return TopicImpl.valueOf(address); + } + } + + return DestinationImpl.valueOf(address); + } + + private Object getMessageAnnotation(Symbol key) + { + Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); + return messageAttrs == null ? null : messageAttrs.get(key); + } + + private Map messageAnnotationMap() + { + Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue(); + if(messageAttrs == null) + { + messageAttrs = new HashMap(); + _messageAnnotations = new MessageAnnotations(messageAttrs); + } + return messageAttrs; + } + + Set splitCommaSeparateSet(String value) + { + if( value == null ) + { + return null; + } + HashSet rc = new HashSet(); + for( String x: value.split("\\s*,\\s*") ) + { + rc.add(x); + } + return rc; + } + + private static Set set(String ...args) + { + HashSet s = new HashSet(); + for (String arg : args) + { + s.add(arg); + } + return Collections.unmodifiableSet(s); + } + + static final String join(String sep, Iterable items) + { + StringBuilder result = new StringBuilder(); + + for (Object o : items) + { + if (result.length() > 0) + { + result.append(sep); + } + result.append(o.toString()); + } + + return result.toString(); + } + } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 5bb8845eb7..badc20472b 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -20,7 +20,6 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.Sender; import org.apache.qpid.amqp_1_0.jms.MessageProducer; -import org.apache.qpid.amqp_1_0.jms.Queue; import org.apache.qpid.amqp_1_0.jms.QueueSender; import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.TopicPublisher; @@ -61,7 +60,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP { try { - _sender = _session.getClientSession().createSender(_destination.getAddress()); + _sender = _session.getClientSession().createSender(_session.toAddress(_destination)); } catch (Sender.SenderCreationException e) { @@ -297,7 +296,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP try { _destination = (DestinationImpl) destination; - _sender = _session.getClientSession().createSender(_destination.getAddress()); + _sender = _session.getClientSession().createSender(_session.toAddress(_destination)); send(message, deliveryMode, priority, ttl); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java index 75003c0d77..8fab315b10 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java @@ -100,7 +100,7 @@ public class QueueBrowserImpl implements QueueBrowser { try { - _receiver = _session.getClientSession().createReceiver(_queue.getAddress(), + _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue), StdDistMode.COPY, AcknowledgeMode.AMO, null, false, diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java index d46ed7183f..67b597f5cf 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java @@ -41,7 +41,7 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei { try { - return getSession().getClientSession().createMovingReceiver(getDestination().getAddress()); + return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination())); } catch (AmqpErrorException e) { diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index cedf9a180a..58b7d4f625 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -899,4 +899,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession { _isTopicSession = topicSession; } + + String toAddress(DestinationImpl dest) + { + return _connection.toDecodedDestination(dest).getAddress(); + } + } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index 52d8c412ec..f267794796 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -66,7 +66,7 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub { try { - String address = getDestination().getAddress(); + String address = getSession().toAddress(getDestination()); Receiver receiver = getSession().getClientSession().createReceiver(address, StdDistMode.COPY, AcknowledgeMode.ALO, getLinkName(), isDurable(), getFilters(), -- cgit v1.2.1