summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
commitb84a68ff43c5c4e3e05fb6d0ce57f7c0a9099fb1 (patch)
tree2867fab56872f3b9d5913f7feb8088e7818e03eb
parent4956baf2bfa881f24c0ecbac3469135a0aef2d2d (diff)
downloadqpid-python-b84a68ff43c5c4e3e05fb6d0ce57f7c0a9099fb1.tar.gz
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1556873 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java21
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java12
-rw-r--r--java/amqp-1-0-client-websocket/build.xml39
-rw-r--r--java/amqp-1-0-client-websocket/pom.xml163
-rw-r--r--java/amqp-1-0-client-websocket/resources/LICENSE204
-rw-r--r--java/amqp-1-0-client-websocket/resources/NOTICE5
-rw-r--r--java/amqp-1-0-client-websocket/resources/README.txt7
-rw-r--r--java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java290
-rw-r--r--java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketTransportProviderFactory.java42
-rw-r--r--java/amqp-1-0-client-websocket/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory19
-rw-r--r--java/amqp-1-0-client/pom.xml11
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java310
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java39
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java196
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java35
-rw-r--r--java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java29
-rw-r--r--java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory19
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java15
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java81
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java (renamed from java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java)4
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java12
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java115
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java35
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/transport/AcceptingTransport.java27
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java140
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java41
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProviderFactory.java55
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/transport/TransportProvider.java38
-rw-r--r--java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory19
-rw-r--r--java/broker-plugins/websocket/build.xml32
-rw-r--r--java/broker-plugins/websocket/pom.xml158
-rw-r--r--java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java294
-rw-r--r--java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java51
-rw-r--r--java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java53
-rw-r--r--java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory19
-rw-r--r--java/build.deps1
-rw-r--r--java/build.xml2
-rw-r--r--java/ivy.nexus.xml6
-rw-r--r--java/module.xml12
-rw-r--r--java/pom.xml2
42 files changed, 2308 insertions, 356 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
index f58a78e7e3..f72c9b3020 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -36,6 +36,7 @@ import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory
{
+ private final String _protocol;
private String _host;
private int _port;
private String _username;
@@ -98,6 +99,20 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
final boolean ssl,
final int maxSessions)
{
+ this(ssl?"amqps":"amqp",host,port,username,password,clientId,remoteHost,ssl,maxSessions);
+ }
+
+ public ConnectionFactoryImpl(final String protocol,
+ final String host,
+ final int port,
+ final String username,
+ final String password,
+ final String clientId,
+ final String remoteHost,
+ final boolean ssl,
+ final int maxSessions)
+ {
+ _protocol = protocol;
_host = host;
_port = port;
_username = username;
@@ -115,7 +130,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
public ConnectionImpl createConnection(final String username, final String password) throws JMSException
{
- ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions);
+ ConnectionImpl connection = new ConnectionImpl(_protocol,_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions);
connection.setQueuePrefix(_queuePrefix);
connection.setTopicPrefix(_topicPrefix);
connection.setUseBinaryMessageId(_useBinaryMessageId);
@@ -138,10 +153,12 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
{
protocol = "amqp";
}
+/*
else if(!protocol.equals("amqp") && !protocol.equals("amqps"))
{
throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'.");
}
+*/
String host = url.getHost();
int port = url.getPort();
@@ -226,7 +243,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
}
ConnectionFactoryImpl connectionFactory =
- new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl, maxSessions);
+ new ConnectionFactoryImpl(protocol,host, port, username, password, clientId, remoteHost, ssl, maxSessions);
connectionFactory.setUseBinaryMessageId(binaryMessageId);
connectionFactory.setSyncPublish(syncPublish);
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index f3ae849dac..55bc8e4f96 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error;
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
+ private final String _protocol;
private ConnectionMetaData _connectionMetaData;
private volatile ExceptionListener _exceptionListener;
@@ -87,8 +88,15 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
this(host, port, username, password, clientId, remoteHost, ssl,0);
}
+
public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
{
+ this(ssl?"amqps":"amqp",host,port,username,password,clientId,remoteHost,ssl,maxSessions);
+ }
+
+ public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
+ {
+ _protocol = protocol;
_host = host;
_port = port;
_username = username;
@@ -109,10 +117,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
_state = State.STOPPED;
Container container = _clientId == null ? new Container() : new Container(_clientId);
- // TODO - authentication, containerId, clientId, ssl?, etc
+
try
{
- _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(_protocol, _host,
_port, _username, _password, container, _remoteHost, _ssl,
_maxSessions - 1);
_conn.setConnectionErrorTask(new ConnectionErrorTask());
diff --git a/java/amqp-1-0-client-websocket/build.xml b/java/amqp-1-0-client-websocket/build.xml
new file mode 100644
index 0000000000..2d538aab66
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/build.xml
@@ -0,0 +1,39 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="AMQP 1.0 Client WebSocket transport" default="build">
+
+ <property name="module.genpom" value="true"/>
+ <property name="module.depends" value="amqp-1-0-common amqp-1-0-client"/>
+ <property name="release.exclude.module.deps" value="true"/>
+
+ <import file="../module.xml"/>
+
+ <target name="release-bin-copy-readme">
+ <copy todir="${module.release}" overwrite="true" failonerror="true">
+ <fileset file="${basedir}/README.txt" />
+ </copy>
+ </target>
+
+ <target name="release-bin-other" depends="release-bin-copy-readme"/>
+
+ <target name="release-bin" depends="release-bin-tasks"/>
+
+</project>
diff --git a/java/amqp-1-0-client-websocket/pom.xml b/java/amqp-1-0-client-websocket/pom.xml
new file mode 100644
index 0000000000..205e0d5ab7
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-project</artifactId>
+ <version>0.26-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>qpid-amqp-1-0-client-websocket</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-amqp-1-0-client</artifactId>
+ <version>0.26-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-amqp-1-0-common</artifactId>
+ <version>0.26-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-servlet_2.5_spec</artifactId>
+ <version>1.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-websocket</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ </build>
+
+</project>
diff --git a/java/amqp-1-0-client-websocket/resources/LICENSE b/java/amqp-1-0-client-websocket/resources/LICENSE
new file mode 100644
index 0000000000..de4b130f35
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/resources/LICENSE
@@ -0,0 +1,204 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
diff --git a/java/amqp-1-0-client-websocket/resources/NOTICE b/java/amqp-1-0-client-websocket/resources/NOTICE
new file mode 100644
index 0000000000..8d1c3f3122
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/resources/NOTICE
@@ -0,0 +1,5 @@
+Apache Qpid
+Copyright 2006-2012 Apache Software Foundation
+This product includes software developed at
+Apache Software Foundation (http://www.apache.org/)
+
diff --git a/java/amqp-1-0-client-websocket/resources/README.txt b/java/amqp-1-0-client-websocket/resources/README.txt
new file mode 100644
index 0000000000..35d25050fe
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/resources/README.txt
@@ -0,0 +1,7 @@
+
+Documentation
+--------------
+All of our user documentation can be accessed at:
+
+http://qpid.apache.org/documentation.html
+
diff --git a/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
new file mode 100644
index 0000000000..6c35e555ca
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
@@ -0,0 +1,290 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client.websocket;
+
+import org.apache.qpid.amqp_1_0.client.ConnectionException;
+import org.apache.qpid.amqp_1_0.client.TransportProvider;
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketClient;
+import org.eclipse.jetty.websocket.WebSocketClientFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+class WebSocketProvider implements TransportProvider
+{
+ public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
+
+ private static final byte AMQP_HEADER_FRAME_TYPE = (byte) 222;
+ private static int _connections;
+ private final String _transport;
+ private static WebSocketClientFactory _factory;
+
+ public WebSocketProvider(final String transport)
+ {
+ _transport = transport;
+ }
+
+ private static synchronized WebSocketClient createWebSocketClient() throws Exception
+ {
+ if(_factory == null)
+ {
+ _factory = new WebSocketClientFactory();
+ _factory.start();
+ }
+ _connections++;
+ return _factory.newWebSocketClient();
+ }
+
+ private static synchronized void removeClient() throws Exception
+ {
+ if(--_connections == 0)
+ {
+ _factory.stop();
+ _factory = null;
+ }
+ }
+
+ @Override
+ public void connect(final ConnectionEndpoint conn,
+ final String address,
+ final int port,
+ final boolean ssl,
+ final ExceptionHandler exceptionHandler) throws ConnectionException
+ {
+
+ try
+ {
+ WebSocketClient client = createWebSocketClient();
+ // Configure the client
+ client.setProtocol(AMQP_WEBSOCKET_SUBPROTOCOL);
+
+
+ ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn);
+
+ final ConnectionHandler.FrameSource src;
+
+ if(conn.requiresSASL())
+ {
+ ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(conn);
+
+ src = new ConnectionHandler.SequentialFrameSource(new HeaderFrameSource((byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)3,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ saslOut,
+ new HeaderFrameSource((byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ out);
+
+ conn.setSaslFrameOutput(saslOut);
+ }
+ else
+ {
+ src = new ConnectionHandler.SequentialFrameSource(new HeaderFrameSource((byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ out);
+ }
+
+ final ConnectionHandler handler = new ConnectionHandler(conn);
+ conn.setFrameOutputHandler(out);
+ final URI uri = new URI(_transport +"://"+ address+":"+ port +"/");
+ WebSocket.Connection connection = client.open(uri, new WebSocket.OnBinaryMessage()
+ {
+ public void onOpen(Connection connection)
+ {
+
+ Thread outputThread = new Thread(new FrameOutputThread(connection, src, conn, exceptionHandler));
+ outputThread.setDaemon(true);
+ outputThread.start();
+ }
+
+ public void onClose(int closeCode, String message)
+ {
+ conn.inputClosed();
+ }
+
+ @Override
+ public void onMessage(final byte[] data, final int offset, final int length)
+ {
+ handler.parse(ByteBuffer.wrap(data,offset,length).slice());
+ }
+ }).get(5, TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ throw new ConnectionException(e);
+ }
+
+ }
+
+
+
+ public static class HeaderFrameSource implements ConnectionHandler.FrameSource
+ {
+
+ private final ByteBuffer _buffer;
+ private boolean _closed;
+
+ public HeaderFrameSource(byte... headerBytes)
+ {
+ _buffer = ByteBuffer.wrap(headerBytes);
+ }
+
+
+ @Override
+ public AMQFrame getNextFrame(final boolean wait)
+ {
+ if(_closed)
+ {
+ return null;
+ }
+ else
+ {
+ _closed = true;
+ return new HeaderFrame(_buffer);
+ }
+ }
+
+ public boolean closed()
+ {
+ return _closed;
+ }
+
+ }
+
+
+ private static class HeaderFrame extends AMQFrame
+ {
+
+ public HeaderFrame(final ByteBuffer buffer)
+ {
+ super(null,buffer);
+ }
+
+ @Override
+ public short getChannel()
+ {
+ return 0;
+ }
+
+ @Override
+ public byte getFrameType()
+ {
+ return AMQP_HEADER_FRAME_TYPE;
+ }
+ }
+
+ private class FrameOutputThread implements Runnable
+ {
+ private final WebSocket.Connection _connection;
+ private final ConnectionHandler.FrameSource _frameSource;
+ private final ExceptionHandler _exceptionHandler;
+ private final FrameWriter _frameWriter;
+ private final byte[] _buffer;
+
+ public FrameOutputThread(final WebSocket.Connection connection,
+ final ConnectionHandler.FrameSource src,
+ final ConnectionEndpoint conn,
+ final ExceptionHandler exceptionHandler)
+ {
+ _connection = connection;
+ _frameSource = src;
+ _exceptionHandler = exceptionHandler;
+ _frameWriter = new FrameWriter(conn.getDescribedTypeRegistry());
+ _buffer = new byte[conn.getMaxFrameSize()];
+ }
+
+ @Override
+ public void run()
+ {
+
+ final FrameWriter frameWriter = _frameWriter;
+ final ByteBuffer buffer = ByteBuffer.wrap(_buffer);
+ try
+ {
+
+ while(_connection.isOpen() && !_frameSource.closed())
+ {
+ AMQFrame frame = _frameSource.getNextFrame(true);
+ if(frame instanceof HeaderFrame)
+ {
+ _connection.sendMessage(frame.getPayload().array(),
+ frame.getPayload().arrayOffset(),
+ frame.getPayload().remaining());
+ }
+ else if(frame != null)
+ {
+ frameWriter.setValue(frame);
+ buffer.clear();
+ int length = frameWriter.writeToBuffer(buffer);
+ _connection.sendMessage(_buffer,0,length);
+ }
+ }
+ if(_frameSource.closed() && _connection.isOpen())
+ {
+ _connection.close();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ removeClient();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketTransportProviderFactory.java b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketTransportProviderFactory.java
new file mode 100644
index 0000000000..0e0dfa14d6
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketTransportProviderFactory.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client.websocket;
+
+import org.apache.qpid.amqp_1_0.client.TransportProvider;
+import org.apache.qpid.amqp_1_0.client.TransportProviderFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public class WebSocketTransportProviderFactory implements TransportProviderFactory
+{
+ @Override
+ public Collection<String> getSupportedTransports()
+ {
+ return Arrays.asList("ws", "wss");
+ }
+
+ @Override
+ public TransportProvider getProvider(final String transport)
+ {
+ return new WebSocketProvider(transport);
+ }
+}
diff --git a/java/amqp-1-0-client-websocket/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory b/java/amqp-1-0-client-websocket/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory
new file mode 100644
index 0000000000..b5993fd7b0
--- /dev/null
+++ b/java/amqp-1-0-client-websocket/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.amqp_1_0.client.websocket.WebSocketTransportProviderFactory \ No newline at end of file
diff --git a/java/amqp-1-0-client/pom.xml b/java/amqp-1-0-client/pom.xml
index 13430c0c22..1d6a444d6d 100644
--- a/java/amqp-1-0-client/pom.xml
+++ b/java/amqp-1-0-client/pom.xml
@@ -35,6 +35,17 @@
</dependencies>
<build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ <resource>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>resources/</include>
+ </includes>
+ </resource>
+ </resources>
</build>
</project>
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
index fc0ff427a2..b2d86c4dbc 100644
--- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
@@ -20,26 +20,15 @@
*/
package org.apache.qpid.amqp_1_0.client;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
import java.security.Principal;
+import java.util.ServiceLoader;
import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.net.ssl.SSLSocketFactory;
-
-import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler;
+import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.Predicate;
-import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
@@ -47,9 +36,8 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
-public class Connection implements SocketExceptionHandler
+public class Connection implements ExceptionHandler
{
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
private static final int MAX_FRAME_SIZE = 65536;
private String _address;
@@ -148,6 +136,20 @@ public class Connection implements SocketExceptionHandler
}
+ public Connection(final String protocol,
+ final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final String remoteHost,
+ final boolean ssl,
+ final int channelMax) throws ConnectionException
+ {
+ this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl,
+ channelMax);
+ }
+
public Connection(final String address,
final int port,
final String username,
@@ -158,141 +160,107 @@ public class Connection implements SocketExceptionHandler
boolean ssl,
int channelMax) throws ConnectionException
{
+ this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container,remoteHostname,ssl,channelMax);
+ }
- _address = address;
+ public Connection(final String protocol,
+ final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname,
+ boolean ssl,
+ int channelMax) throws ConnectionException
+ {
- try
- {
- final Socket s;
- if(ssl)
- {
- s = SSLSocketFactory.getDefault().createSocket(address, port);
- }
- else
- {
- s = new Socket(address, port);
- }
+ _address = address;
- Principal principal = username == null ? null : new Principal()
- {
+ Principal principal = username == null ? null : new Principal()
+ {
- public String getName()
- {
- return username;
- }
- };
- _conn = new ConnectionEndpoint(container, principal, password);
- if(channelMax >= 0)
+ public String getName()
{
- _conn.setChannelMax((short)channelMax);
+ return username;
}
- _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
- _conn.setRemoteAddress(s.getRemoteSocketAddress());
- _conn.setRemoteHostname(remoteHostname);
-
-
-
- ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
-
-
- final OutputStream outputStream = s.getOutputStream();
-
- ConnectionHandler.BytesSource src;
+ };
+ _conn = new ConnectionEndpoint(container, principal, password);
+ if(channelMax >= 0)
+ {
+ _conn.setChannelMax((short)channelMax);
+ }
+ _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+ _conn.setRemoteHostname(remoteHostname);
- if(_conn.requiresSASL())
- {
- ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
-
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)3,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
- new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
-
- _conn.setSaslFrameOutput(saslOut);
- }
- else
- {
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
- }
+ ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
+ ConnectionHandler.BytesSource src;
- ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn, this);
- Thread outputThread = new Thread(outputHandler);
- outputThread.setDaemon(true);
- outputThread.start();
- _conn.setFrameOutputHandler(out);
+ if(_conn.requiresSASL())
+ {
+ ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
+
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)3,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+
+ _conn.setSaslFrameOutput(saslOut);
+ }
+ else
+ {
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+ }
+ TransportProvider transportProvider = getTransportProvider(protocol);
+ transportProvider.connect(_conn,address,port,ssl, this);
- final ConnectionHandler handler = new ConnectionHandler(_conn);
- final InputStream inputStream = s.getInputStream();
- Thread inputThread = new Thread(new Runnable()
- {
+ _conn.open();
- public void run()
- {
- try
- {
- doRead(handler, inputStream);
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- synchronized (outputStream)
- {
- s.close();
- }
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
-
- inputThread.setDaemon(true);
- inputThread.start();
+ }
- _conn.open();
+ private TransportProvider getTransportProvider(final String protocol) throws ConnectionException
+ {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader);
- }
- catch (IOException e)
+ for(TransportProviderFactory tpf : providerFactories)
{
- throw new ConnectionException(e);
+ if(tpf.getSupportedTransports().contains(protocol))
+ {
+ return tpf.getProvider(protocol);
+ }
}
-
+ throw new ConnectionException("Unknown protocol: " + protocol);
}
private Connection(ConnectionEndpoint endpoint)
@@ -301,45 +269,6 @@ public class Connection implements SocketExceptionHandler
}
- private void doRead(final AMQPTransport transport, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
- ByteBuffer bbuf = ByteBuffer.wrap(buf);
- final Object lock = new Object();
- transport.setInputStateChangeListener(new StateChangeListener(){
-
- public void onStateChange(final boolean active)
- {
- synchronized(lock)
- {
- lock.notifyAll();
- }
- }
- });
-
- try
- {
- int read;
- while((read = inputStream.read(buf)) != -1)
- {
- bbuf.position(0);
- bbuf.limit(read);
-
- while(bbuf.hasRemaining() && transport.isOpenForInput())
- {
- transport.processBytes(bbuf);
- }
-
-
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
-
- }
-
public Session createSession() throws ConnectionException
{
checkNotClosed();
@@ -373,47 +302,6 @@ public class Connection implements SocketExceptionHandler
}
- private void doRead(final ConnectionHandler handler, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
-
-
- try
- {
- int read;
- boolean done = false;
- while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
- {
- ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
- Binary b = new Binary(buf,0,read);
-
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
- }
- while(bbuf.hasRemaining() && !handler.isDone())
- {
- handler.parse(bbuf);
- }
-
-
- }
- if(!handler.isDone())
- {
- _conn.inputClosed();
- if(_conn.getConnectionEventListener() != null)
- {
- _conn.getConnectionEventListener().closeReceived();
- }
- }
- }
- catch (IOException e)
- {
- _conn.inputClosed();
- e.printStackTrace();
- }
- }
-
public void close() throws ConnectionErrorException
{
_conn.close();
@@ -465,7 +353,7 @@ public class Connection implements SocketExceptionHandler
}
@Override
- public void processSocketException(Exception exception)
+ public void handleException(Exception exception)
{
Error socketError = new Error();
socketError.setDescription(exception.getClass() + ": " + exception.getMessage());
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java
new file mode 100644
index 0000000000..2327a3860a
--- /dev/null
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public class TCPTransportProviderFactory implements TransportProviderFactory
+{
+ @Override
+ public Collection<String> getSupportedTransports()
+ {
+ return Arrays.asList("amqp","amqps");
+ }
+
+ @Override
+ public TransportProvider getProvider(final String transport)
+ {
+ return new TCPTransportProvier(transport);
+ }
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
new file mode 100644
index 0000000000..1c5eb0a34c
--- /dev/null
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+class TCPTransportProvier implements TransportProvider
+{
+ private final String _transport;
+
+ public TCPTransportProvier(final String transport)
+ {
+ _transport = transport;
+ }
+
+ @Override
+ public void connect(final ConnectionEndpoint conn,
+ final String address,
+ final int port,
+ final boolean ssl,
+ final ExceptionHandler exceptionHandler) throws ConnectionException
+ {
+ try
+ {
+ final Socket s;
+ if(ssl)
+ {
+ s = SSLSocketFactory.getDefault().createSocket(address, port);
+ }
+ else
+ {
+ s = new Socket(address, port);
+ }
+
+ conn.setRemoteAddress(s.getRemoteSocketAddress());
+
+
+ ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn);
+
+ ConnectionHandler.BytesSource src;
+
+ if(conn.requiresSASL())
+ {
+ ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(conn);
+
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)3,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.HeaderBytesSource(conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry())
+ );
+
+ conn.setSaslFrameOutput(saslOut);
+ }
+ else
+ {
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn,(byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry())
+ );
+ }
+
+
+ final OutputStream outputStream = s.getOutputStream();
+ ConnectionHandler.BytesOutputHandler outputHandler =
+ new ConnectionHandler.BytesOutputHandler(outputStream, src, conn, exceptionHandler);
+ Thread outputThread = new Thread(outputHandler);
+ outputThread.setDaemon(true);
+ outputThread.start();
+ conn.setFrameOutputHandler(out);
+
+
+ final ConnectionHandler handler = new ConnectionHandler(conn);
+ final InputStream inputStream = s.getInputStream();
+
+ Thread inputThread = new Thread(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ doRead(conn, handler, inputStream);
+ }
+ finally
+ {
+ if(conn.closedForInput() && conn.closedForOutput())
+ {
+ try
+ {
+ synchronized (outputStream)
+ {
+ s.close();
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+ });
+
+ inputThread.setDaemon(true);
+ inputThread.start();
+
+ }
+ catch (IOException e)
+ {
+ throw new ConnectionException(e);
+ }
+ }
+ private void doRead(final ConnectionEndpoint conn, final ConnectionHandler handler, final InputStream inputStream)
+ {
+ byte[] buf = new byte[2<<15];
+
+
+ try
+ {
+ int read;
+ boolean done = false;
+ while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
+ {
+ ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
+ while(bbuf.hasRemaining() && !handler.isDone())
+ {
+ handler.parse(bbuf);
+ }
+
+
+ }
+ if(!handler.isDone())
+ {
+ conn.inputClosed();
+ if(conn.getConnectionEventListener() != null)
+ {
+ conn.getConnectionEventListener().closeReceived();
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ conn.inputClosed();
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
new file mode 100644
index 0000000000..2430b0e14b
--- /dev/null
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+
+public interface TransportProvider
+{
+ void connect(ConnectionEndpoint conn,
+ String address,
+ int port,
+ boolean ssl,
+ ExceptionHandler exceptionHandler) throws ConnectionException;
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java
new file mode 100644
index 0000000000..82999c5ccc
--- /dev/null
+++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import java.util.Collection;
+
+public interface TransportProviderFactory
+{
+ Collection<String> getSupportedTransports();
+ TransportProvider getProvider(String transport);
+}
diff --git a/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory b/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory
new file mode 100644
index 0000000000..ffde030b30
--- /dev/null
+++ b/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.amqp_1_0.client.TCPTransportProviderFactory \ No newline at end of file
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
index b846b16722..b96e1ab47b 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
@@ -66,20 +66,7 @@ public class SymbolTypeConstructor extends VariableWidthTypeConstructor
if(symbolVal == null)
{
ByteBuffer dup = in.duplicate();
- try
- {
- dup.limit(in.position()+size);
- }
- catch (IllegalArgumentException e)
- {
- System.err.println("in.position(): " + in.position());
- System.err.println("size: " + size);
- System.err.println("dup.position(): " + dup.position());
- System.err.println("dup.capacity(): " + dup.capacity());
- System.err.println("dup.limit(): " + dup.limit());
- throw e;
-
- }
+ dup.limit(in.position()+size);
CharBuffer charBuf = ASCII.decode(dup);
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
index 119dd6bf3a..54a4f22d48 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
@@ -65,6 +65,12 @@ public class ConnectionHandler
public boolean parse(ByteBuffer in)
{
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ Binary b = new Binary(in.array(),in.arrayOffset()+in.position(),in.remaining());
+ RAW_LOGGER.fine("RECV [" + _connection.getRemoteAddress() + "] : " + b.toString());
+ }
+
while(in.hasRemaining() && !isDone())
{
_delegate = _delegate.parse(in);
@@ -376,6 +382,47 @@ public class ConnectionHandler
}
+ public static class SequentialFrameSource implements FrameSource
+ {
+ private Queue<FrameSource> _sources = new LinkedList<FrameSource>();
+
+ public SequentialFrameSource(FrameSource... sources)
+ {
+ _sources.addAll(Arrays.asList(sources));
+ }
+
+ public synchronized void addSource(FrameSource source)
+ {
+ _sources.add(source);
+ }
+
+ @Override
+ public synchronized AMQFrame getNextFrame(final boolean wait)
+ {
+ FrameSource src = _sources.peek();
+ while (src != null && src.closed())
+ {
+ _sources.poll();
+ src = _sources.peek();
+ }
+
+ if(src != null)
+ {
+ return src.getNextFrame(wait);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean closed()
+ {
+ return _sources.isEmpty();
+ }
+ }
+
+
public static class BytesOutputHandler implements Runnable, BytesProcessor
{
@@ -383,28 +430,28 @@ public class ConnectionHandler
private BytesSource _bytesSource;
private boolean _closed;
private ConnectionEndpoint _conn;
- private SocketExceptionHandler _exceptionHandler;
-
- public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
- {
- _outputStream = outputStream;
- _bytesSource = source;
- _conn = conn;
- _exceptionHandler = exceptionHandler;
- }
+ private ExceptionHandler _exceptionHandler;
- public void run()
- {
+ public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, ExceptionHandler exceptionHandler)
+ {
+ _outputStream = outputStream;
+ _bytesSource = source;
+ _conn = conn;
+ _exceptionHandler = exceptionHandler;
+ }
- final BytesSource bytesSource = _bytesSource;
+ public void run()
+ {
- while(!(_closed || bytesSource.closed()))
- {
- _bytesSource.getBytes(this, true);
- }
+ final BytesSource bytesSource = _bytesSource;
+ while(!(_closed || bytesSource.closed()))
+ {
+ _bytesSource.getBytes(this, true);
}
+ }
+
public void processBytes(final ByteBuffer buf)
{
try
@@ -423,7 +470,7 @@ public class ConnectionHandler
catch (IOException e)
{
_closed = true;
- _exceptionHandler.processSocketException(e);
+ _exceptionHandler.handleException(e);
}
}
}
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java
index 540aee0f8d..3adf3c0b18 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java
@@ -23,9 +23,9 @@ package org.apache.qpid.amqp_1_0.framing;
/**
* Callback interface for processing socket exceptions.
*/
-public interface SocketExceptionHandler
+public interface ExceptionHandler
{
- public void processSocketException(Exception exception);
+ public void handleException(Exception exception);
}
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 34ca851978..c37c52c6ea 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -580,19 +580,7 @@ public class SessionEndpoint
if(payload != null && payloadSent < payload.remaining())
{
payload = payload.duplicate();
-try
-{
payload.position(payload.position()+payloadSent);
-}
-catch(IllegalArgumentException e)
-{
- System.err.println("UNEXPECTED");
- System.err.println("Payload Position: " + payload.position());
- System.err.println("Payload Sent: " + payloadSent);
- System.err.println("Payload Remaining: " + payload.remaining());
- throw e;
-
-}
Transfer secondTransfer = new Transfer();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java
index ae6e5ac43a..7338e5046a 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java
@@ -25,7 +25,10 @@ import java.util.EnumSet;
public enum Transport
{
TCP,
- SSL;
+ SSL,
+ WS,
+ WSS,
+ SCTP;
public static Transport valueOfObject(Object transportObject)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
index a4ce95e5aa..806e1f600a 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
@@ -24,7 +24,6 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -47,16 +46,17 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.TransportProviderFactory;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.TransportProvider;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
public class AmqpPortAdapter extends PortAdapter
{
private final Broker _broker;
- private IncomingNetworkTransport _transport;
+ private AcceptingTransport _transport;
public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes, TaskExecutor taskExecutor)
{
@@ -70,42 +70,36 @@ public class AmqpPortAdapter extends PortAdapter
Collection<Transport> transports = getTransports();
Set<AmqpProtocolVersion> supported = convertFromModelProtocolsToAmqp(getProtocols());
- SSLContext sslContext = null;
- if (transports.contains(Transport.SSL))
+ TransportProvider transportProvider = null;
+ final HashSet<Transport> transportSet = new HashSet<Transport>(transports);
+ for(TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(TransportProviderFactory.class))
{
- sslContext = createSslContext();
+ if(tpf.getSupportedTransports().contains(transports))
+ {
+ transportProvider = tpf.getTransportProvider(transportSet);
+ }
}
- AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
-
- String bindingAddress = (String) getAttribute(Port.BINDING_ADDRESS);
- if (WILDCARD_ADDRESS.equals(bindingAddress))
+ if(transportProvider == null)
{
- bindingAddress = null;
+ throw new IllegalConfigurationException("No transport providers found which can satisfy the requirement to support the transports: " + transports);
}
- Integer port = (Integer) getAttribute(Port.PORT);
- InetSocketAddress bindingSocketAddress = null;
- if ( bindingAddress == null )
- {
- bindingSocketAddress = new InetSocketAddress(port);
- }
- else
+
+ SSLContext sslContext = null;
+ if (transports.contains(Transport.SSL))
{
- bindingSocketAddress = new InetSocketAddress(bindingAddress, port);
+ sslContext = createSslContext();
}
- final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(
- bindingSocketAddress, (Boolean)getAttribute(TCP_NO_DELAY),
- (Integer)getAttribute(SEND_BUFFER_SIZE), (Integer)getAttribute(RECEIVE_BUFFER_SIZE),
- (Boolean)getAttribute(NEED_CLIENT_AUTH), (Boolean)getAttribute(WANT_CLIENT_AUTH));
+ AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
- _transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
- final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(
- _broker, transports.contains(Transport.TCP) ? sslContext : null,
- settings.wantClientAuth(), settings.needClientAuth(),
- supported, defaultSupportedProtocolReply, this, transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
+ _transport = transportProvider.createTransport(transportSet,
+ sslContext,
+ this,
+ supported,
+ defaultSupportedProtocolReply);
- _transport.accept(settings, protocolEngineFactory, transports.contains(Transport.TCP) ? null : sslContext);
+ _transport.start();
for(Transport transport : getTransports())
{
CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
@@ -211,65 +205,6 @@ public class AmqpPortAdapter extends PortAdapter
return null;
}
- class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
- {
- private final InetSocketAddress _bindingSocketAddress;
- private final Boolean _tcpNoDelay;
- private final Integer _sendBufferSize;
- private final Integer _receiveBufferSize;
- private final boolean _needClientAuth;
- private final boolean _wantClientAuth;
-
- public ServerNetworkTransportConfiguration(
- InetSocketAddress bindingSocketAddress, boolean tcpNoDelay,
- int sendBufferSize, int receiveBufferSize,
- boolean needClientAuth, boolean wantClientAuth)
- {
- _bindingSocketAddress = bindingSocketAddress;
- _tcpNoDelay = tcpNoDelay;
- _sendBufferSize = sendBufferSize;
- _receiveBufferSize = receiveBufferSize;
- _needClientAuth = needClientAuth;
- _wantClientAuth = wantClientAuth;
- }
-
- @Override
- public boolean wantClientAuth()
- {
- return _wantClientAuth;
- }
-
- @Override
- public boolean needClientAuth()
- {
- return _needClientAuth;
- }
-
- @Override
- public Boolean getTcpNoDelay()
- {
- return _tcpNoDelay;
- }
-
- @Override
- public Integer getSendBufferSize()
- {
- return _sendBufferSize;
- }
-
- @Override
- public Integer getReceiveBufferSize()
- {
- return _receiveBufferSize;
- }
-
- @Override
- public InetSocketAddress getAddress()
- {
- return _bindingSocketAddress;
- }
- };
-
public String toString()
{
return getName();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java
new file mode 100644
index 0000000000..81a6adc40e
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.transport.TransportProvider;
+
+import java.util.Set;
+
+public interface TransportProviderFactory extends Pluggable
+{
+ Set<Set<Transport>> getSupportedTransports();
+
+ TransportProvider getTransportProvider(Set<Transport> transports);
+
+
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 47b578c4ef..b2b585f692 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.SSLStatus;
@@ -274,9 +275,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public void received(ByteBuffer msg)
{
-
_lastReadTime = System.currentTimeMillis();
- ByteBuffer msgheader = msg.duplicate();
+ ByteBuffer msgheader = msg.duplicate().slice();
+
if(_header.remaining() > msgheader.limit())
{
msg.position(msg.limit());
@@ -329,6 +330,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
}
+
if(newDelegate == null && looksLikeSSL(headerBytes))
{
if(_sslContext != null)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/AcceptingTransport.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/AcceptingTransport.java
new file mode 100644
index 0000000000..bf7f2835e0
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/AcceptingTransport.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+public interface AcceptingTransport
+{
+ public void start();
+ public void close();
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
new file mode 100644
index 0000000000..9aef42e6d0
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.Set;
+
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
+class TCPandSSLTransport implements AcceptingTransport
+{
+ private IncomingNetworkTransport _networkTransport;
+ private Set<Transport> _transports;
+ private SSLContext _sslContext;
+ private InetSocketAddress _bindingSocketAddress;
+ private Port _port;
+ private Set<AmqpProtocolVersion> _supported;
+ private AmqpProtocolVersion _defaultSupportedProtocolReply;
+
+ TCPandSSLTransport(final Set<Transport> transports,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ _transports = transports;
+ _sslContext = sslContext;
+ _port = port;
+ _supported = supported;
+ _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
+ }
+
+ @Override
+ public void start()
+ {
+ String bindingAddress = (String) _port.getAttribute(Port.BINDING_ADDRESS);
+ if (WILDCARD_ADDRESS.equals(bindingAddress))
+ {
+ bindingAddress = null;
+ }
+ Integer port = (Integer) _port.getAttribute(Port.PORT);
+ if ( bindingAddress == null )
+ {
+ _bindingSocketAddress = new InetSocketAddress(port);
+ }
+ else
+ {
+ _bindingSocketAddress = new InetSocketAddress(bindingAddress, port);
+ }
+
+ final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
+ _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+ final MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(
+ _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
+ settings.wantClientAuth(), settings.needClientAuth(),
+ _supported,
+ _defaultSupportedProtocolReply,
+ _port,
+ _transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
+
+ _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+ }
+
+ @Override
+ public void close()
+ {
+ _networkTransport.close();
+ }
+
+ class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
+ {
+ public ServerNetworkTransportConfiguration()
+ {
+ }
+
+ @Override
+ public boolean wantClientAuth()
+ {
+ return (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH);
+ }
+
+ @Override
+ public boolean needClientAuth()
+ {
+ return (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH);
+ }
+
+ @Override
+ public Boolean getTcpNoDelay()
+ {
+ return (Boolean)_port.getAttribute(Port.TCP_NO_DELAY);
+ }
+
+ @Override
+ public Integer getSendBufferSize()
+ {
+ return (Integer)_port.getAttribute(Port.SEND_BUFFER_SIZE);
+ }
+
+ @Override
+ public Integer getReceiveBufferSize()
+ {
+ return (Integer)_port.getAttribute(Port.RECEIVE_BUFFER_SIZE);
+ }
+
+ @Override
+ public InetSocketAddress getAddress()
+ {
+ return _bindingSocketAddress;
+ }
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java
new file mode 100644
index 0000000000..33c7774a29
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+
+import javax.net.ssl.SSLContext;
+import java.util.Set;
+
+class TCPandSSLTransportProvider implements TransportProvider
+{
+ @Override
+ public AcceptingTransport createTransport(final Set<Transport> transports,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ return new TCPandSSLTransport(transports, sslContext, port, supported, defaultSupportedProtocolReply);
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProviderFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProviderFactory.java
new file mode 100644
index 0000000000..9b61d1d037
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProviderFactory.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.plugin.TransportProviderFactory;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TCPandSSLTransportProviderFactory implements TransportProviderFactory
+{
+
+ private static final String TYPE = "TCPandSSL";
+
+ @Override
+ public Set<Set<Transport>> getSupportedTransports()
+ {
+ return new HashSet<Set<Transport>>(Arrays.asList(EnumSet.of(Transport.TCP),
+ EnumSet.of(Transport.SSL),
+ EnumSet.of(Transport.TCP,Transport.SSL)));
+ }
+
+ @Override
+ public TransportProvider getTransportProvider(final Set<Transport> transports)
+ {
+ return new TCPandSSLTransportProvider();
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TransportProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TransportProvider.java
new file mode 100644
index 0000000000..86b1e31727
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TransportProvider.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.Set;
+
+public interface TransportProvider
+{
+ AcceptingTransport createTransport(Set<Transport> transports,
+ SSLContext sslContext,
+ Port port,
+ Set<AmqpProtocolVersion> supported,
+ AmqpProtocolVersion defaultSupportedProtocolReply);
+}
diff --git a/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
new file mode 100644
index 0000000000..3838a9c39f
--- /dev/null
+++ b/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.transport.TCPandSSLTransportProviderFactory \ No newline at end of file
diff --git a/java/broker-plugins/websocket/build.xml b/java/broker-plugins/websocket/build.xml
new file mode 100644
index 0000000000..fc3dd3b846
--- /dev/null
+++ b/java/broker-plugins/websocket/build.xml
@@ -0,0 +1,32 @@
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -->
+<project name="Qpid Broker-Plugins Websocket Transport" default="build">
+ <property name="module.depends" value="common broker-core" />
+ <property name="module.test.depends" value="qpid-test-utils broker-core/tests" />
+
+ <property name="module.genpom" value="true"/>
+ <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker-core=provided"/>
+
+ <property name="broker.plugin" value="true"/>
+ <property name="broker-plugins-websocket.libs" value="" />
+
+ <import file="../../module.xml" />
+
+ <target name="bundle" depends="bundle-tasks"/>
+</project>
diff --git a/java/broker-plugins/websocket/pom.xml b/java/broker-plugins/websocket/pom.xml
new file mode 100644
index 0000000000..2029bd33aa
--- /dev/null
+++ b/java/broker-plugins/websocket/pom.xml
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>qpid-project</artifactId>
+ <groupId>org.apache.qpid</groupId>
+ <version>0.26-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>qpid-broker-plugins-websocket</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ <version>0.26-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-servlet_2.5_spec</artifactId>
+ <version>1.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-websocket</artifactId>
+ <version>7.6.10.v20130312</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
new file mode 100644
index 0000000000..b44ed70040
--- /dev/null
+++ b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -0,0 +1,294 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.ssl.SslSocketConnector;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketHandler;
+
+import javax.net.ssl.SSLContext;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.Set;
+
+class WebSocketProvider implements AcceptingTransport
+{
+ public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10";
+ private final Transport _transport;
+ private final SSLContext _sslContext;
+ private final Port _port;
+ private final Set<AmqpProtocolVersion> _supported;
+ private final AmqpProtocolVersion _defaultSupportedProtocolReply;
+ private final ProtocolEngineFactory _factory;
+ private Server _server;
+
+ WebSocketProvider(final Transport transport,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ _transport = transport;
+ _sslContext = sslContext;
+ _port = port;
+ _supported = supported;
+ _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
+ _factory = new MultiVersionProtocolEngineFactory(
+ _port.getParent(Broker.class), null,
+ (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH),
+ (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH),
+ _supported,
+ _defaultSupportedProtocolReply,
+ _port,
+ _transport);
+
+ }
+
+ @Override
+ public void start()
+ {
+ _server = new Server();
+
+ Connector connector = null;
+
+
+ if (_transport == Transport.WS)
+ {
+ connector = new SelectChannelConnector();
+ }
+ else if (_transport == Transport.WSS)
+ {
+ SslContextFactory factory = new SslContextFactory();
+ factory.setSslContext(_sslContext);
+ connector = new SslSocketConnector(factory);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport);
+ }
+ String bindingAddress = _port.getBindingAddress();
+ if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
+ {
+ connector.setHost(bindingAddress.trim());
+ }
+ connector.setPort(_port.getPort());
+ _server.addConnector(connector);
+
+ WebSocketHandler wshandler = new WebSocketHandler()
+ {
+ @Override
+ public WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol)
+ {
+ SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort());
+ SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort());
+ return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress) : null;
+ }
+ };
+
+ _server.setHandler(wshandler);
+ try
+ {
+ _server.start();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ private class AmqpWebSocket implements WebSocket,WebSocket.OnBinaryMessage
+ {
+ private final SocketAddress _localAddress;
+ private final SocketAddress _remoteAddress;
+ private Connection _connection;
+ private final Transport _transport;
+ private ProtocolEngine _engine;
+
+ private AmqpWebSocket(final Transport transport,
+ final SocketAddress localAddress,
+ final SocketAddress remoteAddress)
+ {
+ _transport = transport;
+ _localAddress = localAddress;
+ _remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public void onMessage(final byte[] data, final int offset, final int length)
+ {
+ _engine.received(ByteBuffer.wrap(data, offset, length).slice());
+ }
+
+ @Override
+ public void onOpen(final Connection connection)
+ {
+ _connection = connection;
+
+ _engine = _factory.newProtocolEngine();
+
+ final NetworkConnection connectionWrapper = new ConnectionWrapper(connection, _localAddress, _remoteAddress);
+ _engine.setNetworkConnection(connectionWrapper, connectionWrapper.getSender());
+
+ }
+
+ @Override
+ public void onClose(final int closeCode, final String message)
+ {
+ _engine.closed();
+ }
+ }
+
+ private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer>
+ {
+ private final WebSocket.Connection _connection;
+ private final SocketAddress _localAddress;
+ private final SocketAddress _remoteAddress;
+ private Principal _principal;
+ private int _maxWriteIdle;
+ private int _maxReadIdle;
+
+ public ConnectionWrapper(final WebSocket.Connection connection,
+ final SocketAddress localAddress,
+ final SocketAddress remoteAddress)
+ {
+ _connection = connection;
+ _localAddress = localAddress;
+ _remoteAddress = remoteAddress;
+ }
+
+ @Override
+ public Sender<ByteBuffer> getSender()
+ {
+ return this;
+ }
+
+ @Override
+ public void start()
+ {
+
+ }
+
+ @Override
+ public void setIdleTimeout(final int i)
+ {
+
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ try
+ {
+ _connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining());
+ }
+ catch (IOException e)
+ {
+ close();
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+ _connection.close();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _remoteAddress;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _localAddress;
+ }
+
+ @Override
+ public void setMaxWriteIdle(final int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ @Override
+ public void setMaxReadIdle(final int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public void setPeerPrincipal(final Principal principal)
+ {
+ _principal = principal;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ return _principal;
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+ }
+}
diff --git a/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java
new file mode 100644
index 0000000000..02d1100315
--- /dev/null
+++ b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.TransportProvider;
+
+import javax.net.ssl.SSLContext;
+import java.util.Set;
+
+class WebSocketTransportProvider implements TransportProvider
+{
+ public WebSocketTransportProvider()
+ {
+ }
+
+ @Override
+ public AcceptingTransport createTransport(final Set<Transport> transports,
+ final SSLContext sslContext,
+ final Port port,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedProtocolReply)
+ {
+ return new WebSocketProvider(transports.iterator().next(),
+ sslContext,
+ port,
+ supported,
+ defaultSupportedProtocolReply);
+ }
+}
diff --git a/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java
new file mode 100644
index 0000000000..662f16ce5b
--- /dev/null
+++ b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport.websocket;
+
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.plugin.TransportProviderFactory;
+import org.apache.qpid.server.transport.TransportProvider;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class WebSocketTransportProviderFactory implements TransportProviderFactory
+{
+
+ private static final String TYPE = "Websocket";
+
+ @Override
+ public Set<Set<Transport>> getSupportedTransports()
+ {
+ return Collections.singleton((Set<Transport>)EnumSet.of(Transport.WS));
+ }
+
+ @Override
+ public TransportProvider getTransportProvider(final Set<Transport> transports)
+ {
+ return new WebSocketTransportProvider();
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+}
diff --git a/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
new file mode 100644
index 0000000000..55b88cc7be
--- /dev/null
+++ b/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.transport.websocket.WebSocketTransportProviderFactory \ No newline at end of file
diff --git a/java/build.deps b/java/build.deps
index 17e637cc97..58dea7009e 100644
--- a/java/build.deps
+++ b/java/build.deps
@@ -74,6 +74,7 @@ amqp-1-0-common.libs=
amqp-1-0-client.libs=
amqp-1-0-client-example.libs=${commons-cli}
amqp-1-0-client-jms.libs=${geronimo-jms}
+amqp-1-0-client-websocket.libs = ${jetty} ${jetty-continuation} ${jetty-security} ${jetty-http} ${jetty-io} ${jetty-servlet} ${jetty-util} ${servlet-api} ${jetty-websocket}
tools.libs=${commons-configuration.libs} ${log4j}
broker-core.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
${xalan} ${derby-db} ${commons-configuration.libs} \
diff --git a/java/build.xml b/java/build.xml
index e761677f6d..219029b908 100644
--- a/java/build.xml
+++ b/java/build.xml
@@ -34,7 +34,7 @@
<findSubProjects name="broker-plugins" dir="broker-plugins" excludes="${broker-plugins-exclude}"/>
<findSubProjects name="client-plugins" dir="client-plugins"/>
- <property name="modules.core" value="qpid-test-utils common management/common amqp-1-0-common broker-core broker client amqp-1-0-client amqp-1-0-client-jms tools"/>
+ <property name="modules.core" value="qpid-test-utils common management/common amqp-1-0-common broker-core broker client amqp-1-0-client amqp-1-0-client-jms amqp-1-0-client-websocket tools"/>
<property name="modules.examples" value="client/example management/example amqp-1-0-client/example amqp-1-0-client-jms/example"/>
<property name="modules.tests" value="systests perftests"/>
<property name="modules.plugin" value="${broker-plugins} ${client-plugins}"/>
diff --git a/java/ivy.nexus.xml b/java/ivy.nexus.xml
index d492169ff3..ee0ec27d48 100644
--- a/java/ivy.nexus.xml
+++ b/java/ivy.nexus.xml
@@ -135,6 +135,12 @@
<artifact name="qpid-amqp-1-0-client-jms" type="jar.asc" ext="jar.asc"/>
<artifact name="qpid-amqp-1-0-client-jms" type="source" ext="jar" e:classifier="sources"/>
<artifact name="qpid-amqp-1-0-client-jms" type="source.asc" ext="jar.asc" e:classifier="sources"/>
+ <artifact name="qpid-amqp-1-0-client-websocket" type="pom" ext="pom"/>
+ <artifact name="qpid-amqp-1-0-client-websocket" type="pom.asc" ext="pom.asc"/>
+ <artifact name="qpid-amqp-1-0-client-websocket" type="jar" ext="jar"/>
+ <artifact name="qpid-amqp-1-0-client-websocket" type="jar.asc" ext="jar.asc"/>
+ <artifact name="qpid-amqp-1-0-client-websocket" type="source" ext="jar" e:classifier="sources"/>
+ <artifact name="qpid-amqp-1-0-client-websocket" type="source.asc" ext="jar.asc" e:classifier="sources"/>
<artifact name="qpid-management-common" type="pom" ext="pom"/>
<artifact name="qpid-management-common" type="pom.asc" ext="pom.asc"/>
<artifact name="qpid-management-common" type="jar" ext="jar"/>
diff --git a/java/module.xml b/java/module.xml
index 28c31f967d..6d3f08a386 100644
--- a/java/module.xml
+++ b/java/module.xml
@@ -584,8 +584,8 @@
<copylist todir="${build.lib}" dir="${project.root}" files="${module.libs}"/>
</target>
-
- <target name="libs-release" description="copy dependencies into module release">
+
+ <target name="libs-release-basic" description="copy dependencies into module release">
<!-- Copy the module dependencies -->
<echo message="${module.libs}"/>
<copylist todir="${module.release}/lib" dir="${project.root}" files="${module.libs}"/>
@@ -594,10 +594,18 @@
<!-- Copy the jar for this module -->
<copy todir="${module.release}/lib" failonerror="true">
<fileset file="${module.jar}"/>
+ </copy>
+ </target>
+
+ <target name="libs-release-module-depends" description="copy dependencies into module release" unless="release.exclude.module.deps">
+ <copy todir="${module.release}/lib" failonerror="true">
<fileset dir="${build.lib}" includes="${module.depends.jars}"/>
</copy>
</target>
+ <target name="libs-release" description="copy dependencies into module release" depends="libs-release-basic,libs-release-module-depends">
+ </target>
+
<target name="resources" description="copy resources into build tree">
<copy todir="${build}" failonerror="false" flatten="true">
<fileset dir="${basedir}${file.separator}.." includes="${resources}"/>
diff --git a/java/pom.xml b/java/pom.xml
index 3fd4f4a0ab..1b313b3514 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -137,6 +137,7 @@
<modules>
<module>amqp-1-0-client</module>
<module>amqp-1-0-client-jms</module>
+ <module>amqp-1-0-client-websocket</module>
<module>amqp-1-0-common</module>
<module>broker</module>
<module>broker-core</module>
@@ -153,6 +154,7 @@
<module>broker-plugins/management-http</module>
<module>broker-plugins/management-jmx</module>
<module>broker-plugins/memory-store</module>
+ <module>broker-plugins/websocket</module>
<module>common</module>
<module>client</module>
<module>management/common</module>