diff options
Diffstat (limited to 'java/amqp-1-0-client-jms')
17 files changed, 818 insertions, 119 deletions
diff --git a/java/amqp-1-0-client-jms/build.xml b/java/amqp-1-0-client-jms/build.xml index cfc6e0babe..d501be1d8d 100644 --- a/java/amqp-1-0-client-jms/build.xml +++ b/java/amqp-1-0-client-jms/build.xml @@ -24,6 +24,9 @@ <property name="module.depends" value="amqp-1-0-common amqp-1-0-client"/> <property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/> + <property name="example.src.dir" value="${project.root}/amqp-1-0-client-jms/example/src/main/java" /> + <property name="example.jar.file" value="${build.lib}/qpid-amqp-1-0-client-jms-example-${project.version}.jar" /> + <target name="release-bin-copy-readme"> <copy todir="${module.release}" overwrite="true" failonerror="true"> diff --git a/java/amqp-1-0-client-jms/example/build.xml b/java/amqp-1-0-client-jms/example/build.xml new file mode 100644 index 0000000000..cb9ab0994c --- /dev/null +++ b/java/amqp-1-0-client-jms/example/build.xml @@ -0,0 +1,28 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="AMQ 1.0 JMS Client Example" default="build"> + + <property name="module.depends" value="amqp-1-0-client-jms amqp-1-0-client amqp-1-0-common"/> + <property name="module.test.depends" value=""/> + + <import file="../../module.xml"/> + +</project> diff --git a/java/amqp-1-0-client-jms/resources/LICENSE b/java/amqp-1-0-client-jms/resources/LICENSE new file mode 100644 index 0000000000..de4b130f35 --- /dev/null +++ b/java/amqp-1-0-client-jms/resources/LICENSE @@ -0,0 +1,204 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + diff --git a/java/amqp-1-0-client-jms/resources/NOTICE b/java/amqp-1-0-client-jms/resources/NOTICE new file mode 100644 index 0000000000..8d1c3f3122 --- /dev/null +++ b/java/amqp-1-0-client-jms/resources/NOTICE @@ -0,0 +1,5 @@ +Apache Qpid +Copyright 2006-2012 Apache Software Foundation +This product includes software developed at +Apache Software Foundation (http://www.apache.org/) + diff --git a/java/amqp-1-0-client-jms/resources/README.txt b/java/amqp-1-0-client-jms/resources/README.txt new file mode 100644 index 0000000000..35d25050fe --- /dev/null +++ b/java/amqp-1-0-client-jms/resources/README.txt @@ -0,0 +1,7 @@ + +Documentation +-------------- +All of our user documentation can be accessed at: + +http://qpid.apache.org/documentation.html + diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index d9e6dfe36d..4856a7c491 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -20,8 +20,12 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.net.URLConnection; +import java.net.URLDecoder; +import java.net.URLStreamHandler; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; @@ -39,6 +43,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private String _remoteHost; private boolean _ssl; + private String _queuePrefix; + private String _topicPrefix; public ConnectionFactoryImpl(final String host, final int port, @@ -86,36 +92,70 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection public ConnectionImpl createConnection() throws JMSException { - return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl); + return createConnection(_username, _password); } public ConnectionImpl createConnection(final String username, final String password) throws JMSException { - return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + connection.setQueuePrefix(_queuePrefix); + connection.setTopicPrefix(_topicPrefix); + return connection; } public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException { - URL url = new URL(urlString); + URL url = new URL(null, urlString, new URLStreamHandler() + { + @Override + protected URLConnection openConnection(URL u) throws IOException + { + throw new UnsupportedOperationException(); + } + }); + String protocol = url.getProtocol(); + if(protocol == null || "".equals(protocol)) + { + protocol = "amqp"; + } + else if(!protocol.equals("amqp") && !protocol.equals("amqps")) + { + throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'."); + } String host = url.getHost(); int port = url.getPort(); + + boolean ssl = false; + if(port == -1) { - port = 5672; + if("amqps".equals(protocol)) + { + port = 5671; + ssl = true; + } + else + { + port = 5672; + } } + else if("amqps".equals(protocol)) + { + ssl = true; + } + String userInfo = url.getUserInfo(); String username = null; String password = null; String clientId = null; String remoteHost = null; - boolean ssl = false; if(userInfo != null) { String[] components = userInfo.split(":",2); - username = components[0]; + username = URLDecoder.decode(components[0]); if(components.length == 2) { - password = components[1]; + password = URLDecoder.decode(components[1]); } } String query = url.getQuery(); @@ -139,6 +179,11 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection } } + if(remoteHost == null) + { + remoteHost = host; + } + return new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl); } @@ -170,4 +215,24 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection connection.setTopicConnection(true); return connection; } + + public String getTopicPrefix() + { + return _topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) + { + _topicPrefix = topicPrefix; + } + + public String getQueuePrefix() + { + return _queuePrefix; + } + + public void setQueuePrefix(String queuePrefix) + { + _queuePrefix = queuePrefix; + } } diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 587b12b51a..be1c2d6514 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transport.Container; import javax.jms.*;
import javax.jms.IllegalStateException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import javax.jms.Queue;
+import java.util.*;
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
@@ -43,16 +42,26 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private boolean _isQueueConnection;
private boolean _isTopicConnection;
private final Collection<CloseTask> _closeTasks = new ArrayList<CloseTask>();
+ private final String _host;
+ private final int _port;
+ private final String _username;
+ private final String _password;
+ private final String _remoteHost;
+ private final boolean _ssl;
+ private String _clientId;
+ private String _queuePrefix;
+ private String _topicPrefix;
private static enum State
{
+ UNCONNECTED,
STOPPED,
STARTED,
CLOSED
}
- private volatile State _state = State.STOPPED;
+ private volatile State _state = State.UNCONNECTED;
public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
{
@@ -66,20 +75,52 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
{
- Container container = clientId == null ? new Container() : new Container(clientId);
- // TODO - authentication, containerId, clientId, ssl?, etc
- try
+ _host = host;
+ _port = port;
+ _username = username;
+ _password = password;
+ _clientId = clientId;
+ _remoteHost = remoteHost;
+ _ssl = ssl;
+ }
+
+ private void connect() throws JMSException
+ {
+ synchronized(_lock)
{
- _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container, remoteHost, ssl);
- // TODO - retrieve negotiated AMQP version
- _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+ // already connected?
+ if( _state == State.UNCONNECTED )
+ {
+ _state = State.STOPPED;
+
+ Container container = _clientId == null ? new Container() : new Container(_clientId);
+ // TODO - authentication, containerId, clientId, ssl?, etc
+ try
+ {
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
+ _port, _username, _password, container, _remoteHost, _ssl);
+ // TODO - retrieve negotiated AMQP version
+ _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
+ }
+ catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.setLinkedException(e);
+ jmsEx.initCause(e);
+ throw jmsEx;
+ }
+ }
}
- catch (org.apache.qpid.amqp_1_0.client.Connection.ConnectionException e)
+ }
+
+ private void checkNotConnected(String msg) throws IllegalStateException
+ {
+ synchronized(_lock)
{
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.setLinkedException(e);
- jmsEx.initCause(e);
- throw jmsEx;
+ if( _state != State.UNCONNECTED )
+ {
+ throw new IllegalStateException(msg);
+ }
}
}
@@ -111,7 +152,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect {
throw new IllegalStateException("Cannot create a session on a closed connection");
}
-
+ connect();
SessionImpl session = new SessionImpl(this, acknowledgeMode);
session.setQueueSession(_isQueueConnection);
session.setTopicSession(_isTopicConnection);
@@ -125,14 +166,19 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public String getClientID() throws JMSException
{
checkClosed();
- return _conn.getEndpoint().getContainer().getId();
+ return _clientId;
}
- public void setClientID(final String s) throws JMSException
+ public void setClientID(final String value) throws JMSException
{
- throw new IllegalStateException("Cannot set client-id to \""
- + s
- + "\"; client-id must be set on connection creation");
+ checkNotConnected("Cannot set client-id to \""
+ + value
+ + "\"; client-id must be set before the connection is used");
+ if( _clientId !=null )
+ {
+ throw new IllegalStateException("client-id has already been set");
+ }
+ _clientId = value;
}
public ConnectionMetaData getMetaData() throws JMSException
@@ -158,6 +204,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect synchronized(_lock)
{
checkClosed();
+ connect();
if(_state == State.STOPPED)
{
// TODO
@@ -187,6 +234,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect {
session.stop();
}
+ case UNCONNECTED:
_state = State.STOPPED;
break;
case CLOSED:
@@ -235,7 +283,9 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect {
task.onClose();
}
- _conn.close();
+ if(_conn != null && _state != State.UNCONNECTED ) {
+ _conn.close();
+ }
_state = State.CLOSED;
}
@@ -282,6 +332,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect final int i) throws JMSException
{
checkClosed();
+ if (_isQueueConnection)
+ {
+ throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
+ }
return null; //TODO
}
@@ -326,4 +380,78 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect {
_isTopicConnection = topicConnection;
}
+
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queueprefix)
+ {
+ _queuePrefix = queueprefix;
+ }
+
+ DecodedDestination toDecodedDestination(DestinationImpl dest)
+ {
+ String address = dest.getAddress();
+ Set<String> kind = null;
+ Class clazz = dest.getClass();
+ if( clazz==QueueImpl.class )
+ {
+ kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+ if( _queuePrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_queuePrefix) )
+ {
+ address = _queuePrefix+address;
+ }
+ }
+ }
+ else if( clazz==TopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+ if( _topicPrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_topicPrefix) )
+ {
+ address = _topicPrefix+address;
+ }
+ }
+ }
+ else if( clazz==TemporaryQueueImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+ }
+ else if( clazz==TemporaryTopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+ }
+ return new DecodedDestination(address, kind);
+ }
+
+ DecodedDestination toDecodedDestination(String address, Set<String> kind)
+ {
+ if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+ {
+ return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+ }
+ if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+ {
+ return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+ }
+ return new DecodedDestination(address, kind);
+ }
+
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java new file mode 100644 index 0000000000..74e98c2163 --- /dev/null +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.amqp_1_0.jms.impl; + +import java.util.Set; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +class DecodedDestination +{ + private final String _address; + private final Set<String> _attributes; + + DecodedDestination(String address, Set<String> kind) + { + _address = address; + _attributes = kind; + } + + public String getAddress() + { + return _address; + } + + public Set<String> getAttributes() + { + return _attributes; + } +} diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index e0402cd0a7..3c15c74d6f 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -127,7 +127,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi {
try
{
- return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+ return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
_linkName, _durable, getFilters(), null);
}
catch (AmqpErrorException e)
@@ -316,9 +316,9 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi _lastUnackedMessage = deliveryTag;
}
- void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) throws IllegalStateException
+ void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
{
- final int acknowledgeMode = _session.getAcknowledgeMode();
+ int acknowledgeMode = _session.getAckModeEnum().ordinal();
if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
|| acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java index f1056b94fd..fba50c5477 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java @@ -50,14 +50,24 @@ public abstract class MessageImpl implements Message static final Set<Class> _supportedClasses =
new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
Float.class, Double.class, Character.class, String.class, byte[].class));
- private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type");
+ static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type");
+
+ static final String QUEUE_ATTRIBUTE = "queue";
+ static final String TOPIC_ATTRIBUTE = "topic";
+ static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+ static final Set<String> JMS_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE);
+ static final Set<String> JMS_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE);
+ static final Set<String> JMS_TEMP_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
+ static final Set<String> JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
private Header _header;
private Properties _properties;
private ApplicationProperties _applicationProperties;
private Footer _footer;
- public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
- private SessionImpl _sessionImpl;
+ private final SessionImpl _sessionImpl;
private boolean _readOnly;
private MessageAnnotations _messageAnnotations;
@@ -171,45 +181,53 @@ public abstract class MessageImpl implements Message public DestinationImpl getJMSReplyTo() throws JMSException
{
- return DestinationImpl.valueOf(getReplyTo());
+ return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE)));
}
public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException
{
- if(destination == null)
+ if( destination==null )
{
setReplyTo(null);
- }
- else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
- {
- setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ messageAnnotationMap().remove(REPLY_TO_TYPE);
}
else
{
- throw new NonAMQPDestinationException(destination);
+ DecodedDestination dd = toDecodedDestination(destination);
+ setReplyTo(dd.getAddress());
+ messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes()));
}
}
public DestinationImpl getJMSDestination() throws JMSException
{
- return _isFromQueue ? QueueImpl.valueOf(getTo())
- : _isFromTopic ? TopicImpl.valueOf(getTo())
- : DestinationImpl.valueOf(getTo());
+ Set<String> type = splitCommaSeparateSet((String) getMessageAnnotation(TO_TYPE));
+ if( type==null )
+ {
+ if( _isFromQueue )
+ {
+ type = JMS_QUEUE_ATTRIBUTES;
+ }
+ else if( _isFromTopic )
+ {
+ type = JMS_TOPIC_ATTRIBUTES;
+ }
+ }
+ return toDestination(getTo(), type);
}
public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
{
- if(destination == null)
+ if( destination==null )
{
setTo(null);
- }
- else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
- {
- setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ messageAnnotationMap().remove(TO_TYPE);
}
else
{
- throw new NonAMQPDestinationException(destination);
+ DecodedDestination dd = toDecodedDestination(destination);
+ setTo(dd.getAddress());
+ messageAnnotationMap().put(TO_TYPE, join(",", dd.getAttributes()));
}
}
@@ -264,22 +282,13 @@ public abstract class MessageImpl implements Message public String getJMSType() throws JMSException
{
- Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
- final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
-
+ final Object attrValue = getMessageAnnotation(JMS_TYPE);
return attrValue instanceof String ? attrValue.toString() : null;
}
public void setJMSType(String s) throws JMSException
{
- Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
- if(messageAttrs == null)
- {
- messageAttrs = new HashMap();
- _messageAnnotations = new MessageAnnotations(messageAttrs);
- }
-
- messageAttrs.put(JMS_TYPE, s);
+ messageAnnotationMap().put(JMS_TYPE, s);
}
public long getJMSExpiration() throws JMSException
@@ -1206,4 +1215,118 @@ public abstract class MessageImpl implements Message }
abstract Collection<Section> getSections();
+
+ DecodedDestination toDecodedDestination(Destination destination) throws NonAMQPDestinationException
+ {
+ if(destination == null)
+ {
+ return null;
+ }
+ if (destination instanceof DestinationImpl)
+ {
+ return _sessionImpl.getConnection().toDecodedDestination((DestinationImpl) destination);
+ }
+ throw new NonAMQPDestinationException(destination);
+ }
+
+ DestinationImpl toDestination(String address, Set<String> kind)
+ {
+ if( address == null )
+ {
+ return null;
+ }
+
+ // If destination prefixes are in play, we have to strip the the prefix, and we might
+ // be able to infer the kind, if we don't know it yet.
+ DecodedDestination decoded = _sessionImpl.getConnection().toDecodedDestination(address, kind);
+ address = decoded.getAddress();
+ kind = decoded.getAttributes();
+
+ if( kind == null )
+ {
+ return DestinationImpl.valueOf(address);
+ }
+ if( kind.contains(QUEUE_ATTRIBUTE) )
+ {
+ if( kind.contains(TEMPORARY_ATTRIBUTE) )
+ {
+ return new TemporaryQueueImpl(address, null, _sessionImpl);
+ }
+ else
+ {
+ return QueueImpl.valueOf(address);
+ }
+ }
+ else if ( kind.contains(TOPIC_ATTRIBUTE) )
+ {
+ if( kind.contains(TEMPORARY_ATTRIBUTE) )
+ {
+ return new TemporaryTopicImpl(address, null, _sessionImpl);
+ }
+ else
+ {
+ return TopicImpl.valueOf(address);
+ }
+ }
+
+ return DestinationImpl.valueOf(address);
+ }
+
+ private Object getMessageAnnotation(Symbol key)
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ return messageAttrs == null ? null : messageAttrs.get(key);
+ }
+
+ private Map messageAnnotationMap()
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ if(messageAttrs == null)
+ {
+ messageAttrs = new HashMap();
+ _messageAnnotations = new MessageAnnotations(messageAttrs);
+ }
+ return messageAttrs;
+ }
+
+ Set<String> splitCommaSeparateSet(String value)
+ {
+ if( value == null )
+ {
+ return null;
+ }
+ HashSet<String> rc = new HashSet<String>();
+ for( String x: value.split("\\s*,\\s*") )
+ {
+ rc.add(x);
+ }
+ return rc;
+ }
+
+ private static Set<String> set(String ...args)
+ {
+ HashSet<String> s = new HashSet<String>();
+ for (String arg : args)
+ {
+ s.add(arg);
+ }
+ return Collections.unmodifiableSet(s);
+ }
+
+ static final String join(String sep, Iterable items)
+ {
+ StringBuilder result = new StringBuilder();
+
+ for (Object o : items)
+ {
+ if (result.length() > 0)
+ {
+ result.append(sep);
+ }
+ result.append(o.toString());
+ }
+
+ return result.toString();
+ }
+
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 5bb8845eb7..badc20472b 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -20,7 +20,6 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
-import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
@@ -61,7 +60,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP {
try
{
- _sender = _session.getClientSession().createSender(_destination.getAddress());
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
}
catch (Sender.SenderCreationException e)
{
@@ -297,7 +296,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP try
{
_destination = (DestinationImpl) destination;
- _sender = _session.getClientSession().createSender(_destination.getAddress());
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
send(message, deliveryMode, priority, ttl);
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java index 2a52b0557a..95c1497d07 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java @@ -40,7 +40,25 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage {
static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
- private Data _objectData;
+ static final Data NULL_OBJECT_DATA;
+ static
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(null);
+ oos.flush();
+ oos.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ NULL_OBJECT_DATA = new Data(new Binary(baos.toByteArray()));
+ }
+
+ private Data _objectData = NULL_OBJECT_DATA;
protected ObjectMessageImpl(Header header,
MessageAnnotations messageAnnotations,
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java index 527e82eaed..8fab315b10 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java @@ -18,9 +18,7 @@ */
package org.apache.qpid.amqp_1_0.jms.impl;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
+import java.util.*;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
@@ -29,6 +27,7 @@ import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.messaging.Filter;
import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
@@ -39,49 +38,27 @@ public class QueueBrowserImpl implements QueueBrowser private static final String JMS_SELECTOR = "jms-selector";
private QueueImpl _queue;
private String _selector;
- private Receiver _receiver;
- private Message _nextElement;
- private MessageEnumeration _enumeration;
+ private final SessionImpl _session;
+ private Map<Symbol, Filter> _filters;
+ private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
+ private boolean _closed;
QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
{
_queue = queue;
_selector = selector;
+ _session = session;
- Map<Symbol, Filter> filters;
if(selector == null || selector.trim().equals(""))
{
- filters = null;
+ _filters = null;
}
else
{
- filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
- }
-
-
- try
- {
- _receiver = session.getClientSession().createReceiver(queue.getAddress(),
- StdDistMode.COPY,
- AcknowledgeMode.AMO,null,
- false,
- filters, null);
- _nextElement = _receiver.receive(0L);
- _enumeration = new MessageEnumeration();
- }
- catch(AmqpErrorException e)
- {
- org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
- if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
- {
- throw new InvalidSelectorException(e.getMessage());
- }
- else
- {
- throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
- }
-
+ _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
+ // We do this just to have the server validate the filter..
+ new MessageEnumeration().close();
}
}
@@ -97,42 +74,124 @@ public class QueueBrowserImpl implements QueueBrowser public Enumeration getEnumeration() throws JMSException
{
- if(_enumeration == null)
+ if(_closed)
{
throw new IllegalStateException("Browser has been closed");
}
- return _enumeration;
+ return new MessageEnumeration();
}
public void close() throws JMSException
{
- _receiver.close();
- _enumeration = null;
+ _closed = true;
+ for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
+ {
+ me.close();
+ }
}
- private final class MessageEnumeration implements Enumeration<Message>
+ private final class MessageEnumeration implements Enumeration<MessageImpl>
{
+ private Receiver _receiver;
+ private MessageImpl _nextElement;
+ private boolean _needNext = true;
+
+ MessageEnumeration() throws JMSException
+ {
+ try
+ {
+ _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
+ StdDistMode.COPY,
+ AcknowledgeMode.AMO, null,
+ false,
+ _filters, null);
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ }
+ catch(AmqpErrorException e)
+ {
+ org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+ }
+
+ }
+ _enumerations.add(this);
+
+ }
+
+ public void close()
+ {
+ _enumerations.remove(this);
+ _receiver.close();
+ _receiver = null;
+ }
@Override
public boolean hasMoreElements()
{
+ if( _receiver == null )
+ {
+ return false;
+ }
+ if( _needNext )
+ {
+ _needNext = false;
+ _nextElement = createJMSMessage(_receiver.receive(0L));
+ if( _nextElement == null )
+ {
+ // Drain to verify there really are no more messages.
+ _receiver.drain();
+ _receiver.drainWait();
+ _nextElement = createJMSMessage(_receiver.receive(0L));
+ if( _nextElement == null )
+ {
+ close();
+ }
+ else
+ {
+ // there are still more messages, open up the credit window again..
+ _receiver.clearDrain();
+ }
+ }
+ }
return _nextElement != null;
}
@Override
- public Message nextElement()
+ public MessageImpl nextElement()
{
-
- Message message = _nextElement;
- if(message == null)
+ if( hasMoreElements() )
{
- message = _receiver.receive(0l);
+ MessageImpl message = _nextElement;
+ _nextElement = null;
+ _needNext = true;
+ return message;
}
- if(message != null)
+ else
{
- _nextElement = _receiver.receive(0l);
+ throw new NoSuchElementException();
}
+ }
+ }
+
+ MessageImpl createJMSMessage(final Message msg)
+ {
+ if(msg != null)
+ {
+ final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg);
+ message.setFromQueue(true);
+ message.setFromTopic(false);
return message;
}
+ else
+ {
+ return null;
+ }
}
+
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java index d46ed7183f..67b597f5cf 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java @@ -41,7 +41,7 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei {
try
{
- return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+ return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
}
catch (AmqpErrorException e)
{
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index e321245a0e..58b7d4f625 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -766,7 +766,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession {
while(!_closed)
{
- while(!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))
+ while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
{
try
{
@@ -777,7 +777,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession return;
}
}
- while(_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))
+ while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
{
Message msg;
@@ -804,6 +804,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession if(message != null)
{
+ if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+ {
+ consumer.setLastUnackedMessage(msg.getDeliveryTag());
+ }
_currentConsumer = consumer;
_currentMessage = msg;
try
@@ -816,11 +820,11 @@ public class SessionImpl implements Session, QueueSession, TopicSession _currentMessage = null;
}
- if((_recoveredMessage == null) && (_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
- || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE))
+ if(_recoveredMessage == null)
{
- consumer.acknowledge(msg);
+ consumer.preReceiveAction(msg);
}
+
}
}
@@ -895,4 +899,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession {
_isTopicSession = topicSession;
}
+
+ String toAddress(DestinationImpl dest)
+ {
+ return _connection.toDecodedDestination(dest).getAddress();
+ }
+
}
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index 52d8c412ec..f267794796 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -66,7 +66,7 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub {
try
{
- String address = getDestination().getAddress();
+ String address = getSession().toAddress(getDestination());
Receiver receiver = getSession().getClientSession().createReceiver(address,
StdDistMode.COPY, AcknowledgeMode.ALO,
getLinkName(), isDurable(), getFilters(),
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java index 31030a7d30..091ab41304 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/jndi/PropertiesFileInitialContextFactory.java @@ -57,6 +57,9 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor Map data = new ConcurrentHashMap(); String file = null; + String fileName = (environment.containsKey(Context.PROVIDER_URL)) + ? (String)environment.get(Context.PROVIDER_URL) : System.getProperty(Context.PROVIDER_URL); + try { |