diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
commit | b84a68ff43c5c4e3e05fb6d0ce57f7c0a9099fb1 (patch) | |
tree | 2867fab56872f3b9d5913f7feb8088e7818e03eb | |
parent | 4956baf2bfa881f24c0ecbac3469135a0aef2d2d (diff) | |
download | qpid-python-b84a68ff43c5c4e3e05fb6d0ce57f7c0a9099fb1.tar.gz |
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1556873 13f79535-47bb-0310-9956-ffa450edef68
42 files changed, 2308 insertions, 356 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index f58a78e7e3..f72c9b3020 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -36,6 +36,7 @@ import org.apache.qpid.amqp_1_0.jms.ConnectionFactory; public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory { + private final String _protocol; private String _host; private int _port; private String _username; @@ -98,6 +99,20 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection final boolean ssl, final int maxSessions) { + this(ssl?"amqps":"amqp",host,port,username,password,clientId,remoteHost,ssl,maxSessions); + } + + public ConnectionFactoryImpl(final String protocol, + final String host, + final int port, + final String username, + final String password, + final String clientId, + final String remoteHost, + final boolean ssl, + final int maxSessions) + { + _protocol = protocol; _host = host; _port = port; _username = username; @@ -115,7 +130,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection public ConnectionImpl createConnection(final String username, final String password) throws JMSException { - ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions); + ConnectionImpl connection = new ConnectionImpl(_protocol,_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions); connection.setQueuePrefix(_queuePrefix); connection.setTopicPrefix(_topicPrefix); connection.setUseBinaryMessageId(_useBinaryMessageId); @@ -138,10 +153,12 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection { protocol = "amqp"; } +/* else if(!protocol.equals("amqp") && !protocol.equals("amqps")) { throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'."); } +*/ String host = url.getHost(); int port = url.getPort(); @@ -226,7 +243,7 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection } ConnectionFactoryImpl connectionFactory = - new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl, maxSessions); + new ConnectionFactoryImpl(protocol,host, port, username, password, clientId, remoteHost, ssl, maxSessions); connectionFactory.setUseBinaryMessageId(binaryMessageId); connectionFactory.setSyncPublish(syncPublish); diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index f3ae849dac..55bc8e4f96 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; public class ConnectionImpl implements Connection, QueueConnection, TopicConnection { + private final String _protocol; private ConnectionMetaData _connectionMetaData; private volatile ExceptionListener _exceptionListener; @@ -87,8 +88,15 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect this(host, port, username, password, clientId, remoteHost, ssl,0); } + public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException { + this(ssl?"amqps":"amqp",host,port,username,password,clientId,remoteHost,ssl,maxSessions); + } + + public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException + { + _protocol = protocol; _host = host; _port = port; _username = username; @@ -109,10 +117,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect _state = State.STOPPED; Container container = _clientId == null ? new Container() : new Container(_clientId); - // TODO - authentication, containerId, clientId, ssl?, etc + try { - _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host, + _conn = new org.apache.qpid.amqp_1_0.client.Connection(_protocol, _host, _port, _username, _password, container, _remoteHost, _ssl, _maxSessions - 1); _conn.setConnectionErrorTask(new ConnectionErrorTask()); diff --git a/java/amqp-1-0-client-websocket/build.xml b/java/amqp-1-0-client-websocket/build.xml new file mode 100644 index 0000000000..2d538aab66 --- /dev/null +++ b/java/amqp-1-0-client-websocket/build.xml @@ -0,0 +1,39 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="AMQP 1.0 Client WebSocket transport" default="build"> + + <property name="module.genpom" value="true"/> + <property name="module.depends" value="amqp-1-0-common amqp-1-0-client"/> + <property name="release.exclude.module.deps" value="true"/> + + <import file="../module.xml"/> + + <target name="release-bin-copy-readme"> + <copy todir="${module.release}" overwrite="true" failonerror="true"> + <fileset file="${basedir}/README.txt" /> + </copy> + </target> + + <target name="release-bin-other" depends="release-bin-copy-readme"/> + + <target name="release-bin" depends="release-bin-tasks"/> + +</project> diff --git a/java/amqp-1-0-client-websocket/pom.xml b/java/amqp-1-0-client-websocket/pom.xml new file mode 100644 index 0000000000..205e0d5ab7 --- /dev/null +++ b/java/amqp-1-0-client-websocket/pom.xml @@ -0,0 +1,163 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-project</artifactId> + <version>0.26-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>qpid-amqp-1-0-client-websocket</artifactId> + + <dependencies> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-client</artifactId> + <version>0.26-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-common</artifactId> + <version>0.26-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-servlet_2.5_spec</artifactId> + <version>1.2</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-continuation</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-continuation</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-security</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-security</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-websocket</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + </build> + +</project> diff --git a/java/amqp-1-0-client-websocket/resources/LICENSE b/java/amqp-1-0-client-websocket/resources/LICENSE new file mode 100644 index 0000000000..de4b130f35 --- /dev/null +++ b/java/amqp-1-0-client-websocket/resources/LICENSE @@ -0,0 +1,204 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + diff --git a/java/amqp-1-0-client-websocket/resources/NOTICE b/java/amqp-1-0-client-websocket/resources/NOTICE new file mode 100644 index 0000000000..8d1c3f3122 --- /dev/null +++ b/java/amqp-1-0-client-websocket/resources/NOTICE @@ -0,0 +1,5 @@ +Apache Qpid +Copyright 2006-2012 Apache Software Foundation +This product includes software developed at +Apache Software Foundation (http://www.apache.org/) + diff --git a/java/amqp-1-0-client-websocket/resources/README.txt b/java/amqp-1-0-client-websocket/resources/README.txt new file mode 100644 index 0000000000..35d25050fe --- /dev/null +++ b/java/amqp-1-0-client-websocket/resources/README.txt @@ -0,0 +1,7 @@ + +Documentation +-------------- +All of our user documentation can be accessed at: + +http://qpid.apache.org/documentation.html + diff --git a/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java new file mode 100644 index 0000000000..6c35e555ca --- /dev/null +++ b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java @@ -0,0 +1,290 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client.websocket; + +import org.apache.qpid.amqp_1_0.client.ConnectionException; +import org.apache.qpid.amqp_1_0.client.TransportProvider; +import org.apache.qpid.amqp_1_0.codec.FrameWriter; +import org.apache.qpid.amqp_1_0.framing.AMQFrame; +import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.FrameBody; +import org.apache.qpid.amqp_1_0.type.SaslFrameBody; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketClient; +import org.eclipse.jetty.websocket.WebSocketClientFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +class WebSocketProvider implements TransportProvider +{ + public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10"; + + private static final byte AMQP_HEADER_FRAME_TYPE = (byte) 222; + private static int _connections; + private final String _transport; + private static WebSocketClientFactory _factory; + + public WebSocketProvider(final String transport) + { + _transport = transport; + } + + private static synchronized WebSocketClient createWebSocketClient() throws Exception + { + if(_factory == null) + { + _factory = new WebSocketClientFactory(); + _factory.start(); + } + _connections++; + return _factory.newWebSocketClient(); + } + + private static synchronized void removeClient() throws Exception + { + if(--_connections == 0) + { + _factory.stop(); + _factory = null; + } + } + + @Override + public void connect(final ConnectionEndpoint conn, + final String address, + final int port, + final boolean ssl, + final ExceptionHandler exceptionHandler) throws ConnectionException + { + + try + { + WebSocketClient client = createWebSocketClient(); + // Configure the client + client.setProtocol(AMQP_WEBSOCKET_SUBPROTOCOL); + + + ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn); + + final ConnectionHandler.FrameSource src; + + if(conn.requiresSASL()) + { + ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(conn); + + src = new ConnectionHandler.SequentialFrameSource(new HeaderFrameSource((byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)3, + (byte)1, + (byte)0, + (byte)0), + saslOut, + new HeaderFrameSource((byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + out); + + conn.setSaslFrameOutput(saslOut); + } + else + { + src = new ConnectionHandler.SequentialFrameSource(new HeaderFrameSource((byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + out); + } + + final ConnectionHandler handler = new ConnectionHandler(conn); + conn.setFrameOutputHandler(out); + final URI uri = new URI(_transport +"://"+ address+":"+ port +"/"); + WebSocket.Connection connection = client.open(uri, new WebSocket.OnBinaryMessage() + { + public void onOpen(Connection connection) + { + + Thread outputThread = new Thread(new FrameOutputThread(connection, src, conn, exceptionHandler)); + outputThread.setDaemon(true); + outputThread.start(); + } + + public void onClose(int closeCode, String message) + { + conn.inputClosed(); + } + + @Override + public void onMessage(final byte[] data, final int offset, final int length) + { + handler.parse(ByteBuffer.wrap(data,offset,length).slice()); + } + }).get(5, TimeUnit.SECONDS); + } + catch (Exception e) + { + throw new ConnectionException(e); + } + + } + + + + public static class HeaderFrameSource implements ConnectionHandler.FrameSource + { + + private final ByteBuffer _buffer; + private boolean _closed; + + public HeaderFrameSource(byte... headerBytes) + { + _buffer = ByteBuffer.wrap(headerBytes); + } + + + @Override + public AMQFrame getNextFrame(final boolean wait) + { + if(_closed) + { + return null; + } + else + { + _closed = true; + return new HeaderFrame(_buffer); + } + } + + public boolean closed() + { + return _closed; + } + + } + + + private static class HeaderFrame extends AMQFrame + { + + public HeaderFrame(final ByteBuffer buffer) + { + super(null,buffer); + } + + @Override + public short getChannel() + { + return 0; + } + + @Override + public byte getFrameType() + { + return AMQP_HEADER_FRAME_TYPE; + } + } + + private class FrameOutputThread implements Runnable + { + private final WebSocket.Connection _connection; + private final ConnectionHandler.FrameSource _frameSource; + private final ExceptionHandler _exceptionHandler; + private final FrameWriter _frameWriter; + private final byte[] _buffer; + + public FrameOutputThread(final WebSocket.Connection connection, + final ConnectionHandler.FrameSource src, + final ConnectionEndpoint conn, + final ExceptionHandler exceptionHandler) + { + _connection = connection; + _frameSource = src; + _exceptionHandler = exceptionHandler; + _frameWriter = new FrameWriter(conn.getDescribedTypeRegistry()); + _buffer = new byte[conn.getMaxFrameSize()]; + } + + @Override + public void run() + { + + final FrameWriter frameWriter = _frameWriter; + final ByteBuffer buffer = ByteBuffer.wrap(_buffer); + try + { + + while(_connection.isOpen() && !_frameSource.closed()) + { + AMQFrame frame = _frameSource.getNextFrame(true); + if(frame instanceof HeaderFrame) + { + _connection.sendMessage(frame.getPayload().array(), + frame.getPayload().arrayOffset(), + frame.getPayload().remaining()); + } + else if(frame != null) + { + frameWriter.setValue(frame); + buffer.clear(); + int length = frameWriter.writeToBuffer(buffer); + _connection.sendMessage(_buffer,0,length); + } + } + if(_frameSource.closed() && _connection.isOpen()) + { + _connection.close(); + } + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + try + { + removeClient(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + } +} diff --git a/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketTransportProviderFactory.java b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketTransportProviderFactory.java new file mode 100644 index 0000000000..0e0dfa14d6 --- /dev/null +++ b/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketTransportProviderFactory.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client.websocket; + +import org.apache.qpid.amqp_1_0.client.TransportProvider; +import org.apache.qpid.amqp_1_0.client.TransportProviderFactory; + +import java.util.Arrays; +import java.util.Collection; + +public class WebSocketTransportProviderFactory implements TransportProviderFactory +{ + @Override + public Collection<String> getSupportedTransports() + { + return Arrays.asList("ws", "wss"); + } + + @Override + public TransportProvider getProvider(final String transport) + { + return new WebSocketProvider(transport); + } +} diff --git a/java/amqp-1-0-client-websocket/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory b/java/amqp-1-0-client-websocket/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory new file mode 100644 index 0000000000..b5993fd7b0 --- /dev/null +++ b/java/amqp-1-0-client-websocket/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.amqp_1_0.client.websocket.WebSocketTransportProviderFactory
\ No newline at end of file diff --git a/java/amqp-1-0-client/pom.xml b/java/amqp-1-0-client/pom.xml index 13430c0c22..1d6a444d6d 100644 --- a/java/amqp-1-0-client/pom.xml +++ b/java/amqp-1-0-client/pom.xml @@ -35,6 +35,17 @@ </dependencies> <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + <resource> + <directory>src/main/java</directory> + <includes> + <include>resources/</include> + </includes> + </resource> + </resources> </build> </project> diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index fc0ff427a2..b2d86c4dbc 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,26 +20,15 @@ */ package org.apache.qpid.amqp_1_0.client; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; import java.security.Principal; +import java.util.ServiceLoader; import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.net.ssl.SSLSocketFactory; - -import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; -import org.apache.qpid.amqp_1_0.transport.AMQPTransport; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.Container; import org.apache.qpid.amqp_1_0.transport.Predicate; -import org.apache.qpid.amqp_1_0.transport.StateChangeListener; -import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; @@ -47,9 +36,8 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; import org.apache.qpid.amqp_1_0.type.transport.Error; -public class Connection implements SocketExceptionHandler +public class Connection implements ExceptionHandler { - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); private static final int MAX_FRAME_SIZE = 65536; private String _address; @@ -148,6 +136,20 @@ public class Connection implements SocketExceptionHandler } + public Connection(final String protocol, + final String address, + final int port, + final String username, + final String password, + final Container container, + final String remoteHost, + final boolean ssl, + final int channelMax) throws ConnectionException + { + this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl, + channelMax); + } + public Connection(final String address, final int port, final String username, @@ -158,141 +160,107 @@ public class Connection implements SocketExceptionHandler boolean ssl, int channelMax) throws ConnectionException { + this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container,remoteHostname,ssl,channelMax); + } - _address = address; + public Connection(final String protocol, + final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container, + final String remoteHostname, + boolean ssl, + int channelMax) throws ConnectionException + { - try - { - final Socket s; - if(ssl) - { - s = SSLSocketFactory.getDefault().createSocket(address, port); - } - else - { - s = new Socket(address, port); - } + _address = address; - Principal principal = username == null ? null : new Principal() - { + Principal principal = username == null ? null : new Principal() + { - public String getName() - { - return username; - } - }; - _conn = new ConnectionEndpoint(container, principal, password); - if(channelMax >= 0) + public String getName() { - _conn.setChannelMax((short)channelMax); + return username; } - _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); - _conn.setRemoteAddress(s.getRemoteSocketAddress()); - _conn.setRemoteHostname(remoteHostname); - - - - ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn); - - - final OutputStream outputStream = s.getOutputStream(); - - ConnectionHandler.BytesSource src; + }; + _conn = new ConnectionEndpoint(container, principal, password); + if(channelMax >= 0) + { + _conn.setChannelMax((short)channelMax); + } + _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); + _conn.setRemoteHostname(remoteHostname); - if(_conn.requiresSASL()) - { - ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn); - - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)3, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), - new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) - ); - - _conn.setSaslFrameOutput(saslOut); - } - else - { - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) - ); - } + ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn); + ConnectionHandler.BytesSource src; - ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn, this); - Thread outputThread = new Thread(outputHandler); - outputThread.setDaemon(true); - outputThread.start(); - _conn.setFrameOutputHandler(out); + if(_conn.requiresSASL()) + { + ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn); + + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)3, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), + new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + + _conn.setSaslFrameOutput(saslOut); + } + else + { + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + } + TransportProvider transportProvider = getTransportProvider(protocol); + transportProvider.connect(_conn,address,port,ssl, this); - final ConnectionHandler handler = new ConnectionHandler(_conn); - final InputStream inputStream = s.getInputStream(); - Thread inputThread = new Thread(new Runnable() - { + _conn.open(); - public void run() - { - try - { - doRead(handler, inputStream); - } - finally - { - if(_conn.closedForInput() && _conn.closedForOutput()) - { - try - { - synchronized (outputStream) - { - s.close(); - } - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - }); - - inputThread.setDaemon(true); - inputThread.start(); + } - _conn.open(); + private TransportProvider getTransportProvider(final String protocol) throws ConnectionException + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader); - } - catch (IOException e) + for(TransportProviderFactory tpf : providerFactories) { - throw new ConnectionException(e); + if(tpf.getSupportedTransports().contains(protocol)) + { + return tpf.getProvider(protocol); + } } - + throw new ConnectionException("Unknown protocol: " + protocol); } private Connection(ConnectionEndpoint endpoint) @@ -301,45 +269,6 @@ public class Connection implements SocketExceptionHandler } - private void doRead(final AMQPTransport transport, final InputStream inputStream) - { - byte[] buf = new byte[2<<15]; - ByteBuffer bbuf = ByteBuffer.wrap(buf); - final Object lock = new Object(); - transport.setInputStateChangeListener(new StateChangeListener(){ - - public void onStateChange(final boolean active) - { - synchronized(lock) - { - lock.notifyAll(); - } - } - }); - - try - { - int read; - while((read = inputStream.read(buf)) != -1) - { - bbuf.position(0); - bbuf.limit(read); - - while(bbuf.hasRemaining() && transport.isOpenForInput()) - { - transport.processBytes(bbuf); - } - - - } - } - catch (IOException e) - { - e.printStackTrace(); - } - - } - public Session createSession() throws ConnectionException { checkNotClosed(); @@ -373,47 +302,6 @@ public class Connection implements SocketExceptionHandler } - private void doRead(final ConnectionHandler handler, final InputStream inputStream) - { - byte[] buf = new byte[2<<15]; - - - try - { - int read; - boolean done = false; - while(!handler.isDone() && (read = inputStream.read(buf)) != -1) - { - ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); - Binary b = new Binary(buf,0,read); - - if(RAW_LOGGER.isLoggable(Level.FINE)) - { - RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); - } - while(bbuf.hasRemaining() && !handler.isDone()) - { - handler.parse(bbuf); - } - - - } - if(!handler.isDone()) - { - _conn.inputClosed(); - if(_conn.getConnectionEventListener() != null) - { - _conn.getConnectionEventListener().closeReceived(); - } - } - } - catch (IOException e) - { - _conn.inputClosed(); - e.printStackTrace(); - } - } - public void close() throws ConnectionErrorException { _conn.close(); @@ -465,7 +353,7 @@ public class Connection implements SocketExceptionHandler } @Override - public void processSocketException(Exception exception) + public void handleException(Exception exception) { Error socketError = new Error(); socketError.setDescription(exception.getClass() + ": " + exception.getMessage()); diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java new file mode 100644 index 0000000000..2327a3860a --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.util.Arrays; +import java.util.Collection; + +public class TCPTransportProviderFactory implements TransportProviderFactory +{ + @Override + public Collection<String> getSupportedTransports() + { + return Arrays.asList("amqp","amqps"); + } + + @Override + public TransportProvider getProvider(final String transport) + { + return new TCPTransportProvier(transport); + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java new file mode 100644 index 0000000000..1c5eb0a34c --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.type.FrameBody; +import org.apache.qpid.amqp_1_0.type.SaslFrameBody; + +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; + +class TCPTransportProvier implements TransportProvider +{ + private final String _transport; + + public TCPTransportProvier(final String transport) + { + _transport = transport; + } + + @Override + public void connect(final ConnectionEndpoint conn, + final String address, + final int port, + final boolean ssl, + final ExceptionHandler exceptionHandler) throws ConnectionException + { + try + { + final Socket s; + if(ssl) + { + s = SSLSocketFactory.getDefault().createSocket(address, port); + } + else + { + s = new Socket(address, port); + } + + conn.setRemoteAddress(s.getRemoteSocketAddress()); + + + ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn); + + ConnectionHandler.BytesSource src; + + if(conn.requiresSASL()) + { + ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(conn); + + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)3, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,conn.getDescribedTypeRegistry()), + new ConnectionHandler.HeaderBytesSource(conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry()) + ); + + conn.setSaslFrameOutput(saslOut); + } + else + { + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn,(byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry()) + ); + } + + + final OutputStream outputStream = s.getOutputStream(); + ConnectionHandler.BytesOutputHandler outputHandler = + new ConnectionHandler.BytesOutputHandler(outputStream, src, conn, exceptionHandler); + Thread outputThread = new Thread(outputHandler); + outputThread.setDaemon(true); + outputThread.start(); + conn.setFrameOutputHandler(out); + + + final ConnectionHandler handler = new ConnectionHandler(conn); + final InputStream inputStream = s.getInputStream(); + + Thread inputThread = new Thread(new Runnable() + { + + public void run() + { + try + { + doRead(conn, handler, inputStream); + } + finally + { + if(conn.closedForInput() && conn.closedForOutput()) + { + try + { + synchronized (outputStream) + { + s.close(); + } + } + catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + }); + + inputThread.setDaemon(true); + inputThread.start(); + + } + catch (IOException e) + { + throw new ConnectionException(e); + } + } + private void doRead(final ConnectionEndpoint conn, final ConnectionHandler handler, final InputStream inputStream) + { + byte[] buf = new byte[2<<15]; + + + try + { + int read; + boolean done = false; + while(!handler.isDone() && (read = inputStream.read(buf)) != -1) + { + ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); + while(bbuf.hasRemaining() && !handler.isDone()) + { + handler.parse(bbuf); + } + + + } + if(!handler.isDone()) + { + conn.inputClosed(); + if(conn.getConnectionEventListener() != null) + { + conn.getConnectionEventListener().closeReceived(); + } + } + } + catch (IOException e) + { + conn.inputClosed(); + e.printStackTrace(); + } + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java new file mode 100644 index 0000000000..2430b0e14b --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.type.FrameBody; + +public interface TransportProvider +{ + void connect(ConnectionEndpoint conn, + String address, + int port, + boolean ssl, + ExceptionHandler exceptionHandler) throws ConnectionException; +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java new file mode 100644 index 0000000000..82999c5ccc --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.util.Collection; + +public interface TransportProviderFactory +{ + Collection<String> getSupportedTransports(); + TransportProvider getProvider(String transport); +} diff --git a/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory b/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory new file mode 100644 index 0000000000..ffde030b30 --- /dev/null +++ b/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.amqp_1_0.client.TCPTransportProviderFactory
\ No newline at end of file diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java index b846b16722..b96e1ab47b 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java @@ -66,20 +66,7 @@ public class SymbolTypeConstructor extends VariableWidthTypeConstructor if(symbolVal == null) { ByteBuffer dup = in.duplicate(); - try - { - dup.limit(in.position()+size); - } - catch (IllegalArgumentException e) - { - System.err.println("in.position(): " + in.position()); - System.err.println("size: " + size); - System.err.println("dup.position(): " + dup.position()); - System.err.println("dup.capacity(): " + dup.capacity()); - System.err.println("dup.limit(): " + dup.limit()); - throw e; - - } + dup.limit(in.position()+size); CharBuffer charBuf = ASCII.decode(dup); diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java index 119dd6bf3a..54a4f22d48 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java @@ -65,6 +65,12 @@ public class ConnectionHandler public boolean parse(ByteBuffer in) { + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + Binary b = new Binary(in.array(),in.arrayOffset()+in.position(),in.remaining()); + RAW_LOGGER.fine("RECV [" + _connection.getRemoteAddress() + "] : " + b.toString()); + } + while(in.hasRemaining() && !isDone()) { _delegate = _delegate.parse(in); @@ -376,6 +382,47 @@ public class ConnectionHandler } + public static class SequentialFrameSource implements FrameSource + { + private Queue<FrameSource> _sources = new LinkedList<FrameSource>(); + + public SequentialFrameSource(FrameSource... sources) + { + _sources.addAll(Arrays.asList(sources)); + } + + public synchronized void addSource(FrameSource source) + { + _sources.add(source); + } + + @Override + public synchronized AMQFrame getNextFrame(final boolean wait) + { + FrameSource src = _sources.peek(); + while (src != null && src.closed()) + { + _sources.poll(); + src = _sources.peek(); + } + + if(src != null) + { + return src.getNextFrame(wait); + } + else + { + return null; + } + } + + public boolean closed() + { + return _sources.isEmpty(); + } + } + + public static class BytesOutputHandler implements Runnable, BytesProcessor { @@ -383,28 +430,28 @@ public class ConnectionHandler private BytesSource _bytesSource; private boolean _closed; private ConnectionEndpoint _conn; - private SocketExceptionHandler _exceptionHandler; - - public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler) - { - _outputStream = outputStream; - _bytesSource = source; - _conn = conn; - _exceptionHandler = exceptionHandler; - } + private ExceptionHandler _exceptionHandler; - public void run() - { + public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, ExceptionHandler exceptionHandler) + { + _outputStream = outputStream; + _bytesSource = source; + _conn = conn; + _exceptionHandler = exceptionHandler; + } - final BytesSource bytesSource = _bytesSource; + public void run() + { - while(!(_closed || bytesSource.closed())) - { - _bytesSource.getBytes(this, true); - } + final BytesSource bytesSource = _bytesSource; + while(!(_closed || bytesSource.closed())) + { + _bytesSource.getBytes(this, true); } + } + public void processBytes(final ByteBuffer buf) { try @@ -423,7 +470,7 @@ public class ConnectionHandler catch (IOException e) { _closed = true; - _exceptionHandler.processSocketException(e); + _exceptionHandler.handleException(e); } } } diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java index 540aee0f8d..3adf3c0b18 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java @@ -23,9 +23,9 @@ package org.apache.qpid.amqp_1_0.framing; /** * Callback interface for processing socket exceptions. */ -public interface SocketExceptionHandler +public interface ExceptionHandler { - public void processSocketException(Exception exception); + public void handleException(Exception exception); } diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 34ca851978..c37c52c6ea 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -580,19 +580,7 @@ public class SessionEndpoint if(payload != null && payloadSent < payload.remaining()) { payload = payload.duplicate(); -try -{ payload.position(payload.position()+payloadSent); -} -catch(IllegalArgumentException e) -{ - System.err.println("UNEXPECTED"); - System.err.println("Payload Position: " + payload.position()); - System.err.println("Payload Sent: " + payloadSent); - System.err.println("Payload Remaining: " + payload.remaining()); - throw e; - -} Transfer secondTransfer = new Transfer(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java index ae6e5ac43a..7338e5046a 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java @@ -25,7 +25,10 @@ import java.util.EnumSet; public enum Transport { TCP, - SSL; + SSL, + WS, + WSS, + SCTP; public static Transport valueOfObject(Object transportObject) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java index a4ce95e5aa..806e1f600a 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java @@ -24,7 +24,6 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; import java.net.InetSocketAddress; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -47,16 +46,17 @@ import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.plugin.QpidServiceLoader; +import org.apache.qpid.server.plugin.TransportProviderFactory; import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.server.transport.TransportProvider; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; public class AmqpPortAdapter extends PortAdapter { private final Broker _broker; - private IncomingNetworkTransport _transport; + private AcceptingTransport _transport; public AmqpPortAdapter(UUID id, Broker broker, Map<String, Object> attributes, Map<String, Object> defaultAttributes, TaskExecutor taskExecutor) { @@ -70,42 +70,36 @@ public class AmqpPortAdapter extends PortAdapter Collection<Transport> transports = getTransports(); Set<AmqpProtocolVersion> supported = convertFromModelProtocolsToAmqp(getProtocols()); - SSLContext sslContext = null; - if (transports.contains(Transport.SSL)) + TransportProvider transportProvider = null; + final HashSet<Transport> transportSet = new HashSet<Transport>(transports); + for(TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(TransportProviderFactory.class)) { - sslContext = createSslContext(); + if(tpf.getSupportedTransports().contains(transports)) + { + transportProvider = tpf.getTransportProvider(transportSet); + } } - AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply(); - - String bindingAddress = (String) getAttribute(Port.BINDING_ADDRESS); - if (WILDCARD_ADDRESS.equals(bindingAddress)) + if(transportProvider == null) { - bindingAddress = null; + throw new IllegalConfigurationException("No transport providers found which can satisfy the requirement to support the transports: " + transports); } - Integer port = (Integer) getAttribute(Port.PORT); - InetSocketAddress bindingSocketAddress = null; - if ( bindingAddress == null ) - { - bindingSocketAddress = new InetSocketAddress(port); - } - else + + SSLContext sslContext = null; + if (transports.contains(Transport.SSL)) { - bindingSocketAddress = new InetSocketAddress(bindingAddress, port); + sslContext = createSslContext(); } - final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration( - bindingSocketAddress, (Boolean)getAttribute(TCP_NO_DELAY), - (Integer)getAttribute(SEND_BUFFER_SIZE), (Integer)getAttribute(RECEIVE_BUFFER_SIZE), - (Boolean)getAttribute(NEED_CLIENT_AUTH), (Boolean)getAttribute(WANT_CLIENT_AUTH)); + AmqpProtocolVersion defaultSupportedProtocolReply = getDefaultAmqpSupportedReply(); - _transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance(); - final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( - _broker, transports.contains(Transport.TCP) ? sslContext : null, - settings.wantClientAuth(), settings.needClientAuth(), - supported, defaultSupportedProtocolReply, this, transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL); + _transport = transportProvider.createTransport(transportSet, + sslContext, + this, + supported, + defaultSupportedProtocolReply); - _transport.accept(settings, protocolEngineFactory, transports.contains(Transport.TCP) ? null : sslContext); + _transport.start(); for(Transport transport : getTransports()) { CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort())); @@ -211,65 +205,6 @@ public class AmqpPortAdapter extends PortAdapter return null; } - class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration - { - private final InetSocketAddress _bindingSocketAddress; - private final Boolean _tcpNoDelay; - private final Integer _sendBufferSize; - private final Integer _receiveBufferSize; - private final boolean _needClientAuth; - private final boolean _wantClientAuth; - - public ServerNetworkTransportConfiguration( - InetSocketAddress bindingSocketAddress, boolean tcpNoDelay, - int sendBufferSize, int receiveBufferSize, - boolean needClientAuth, boolean wantClientAuth) - { - _bindingSocketAddress = bindingSocketAddress; - _tcpNoDelay = tcpNoDelay; - _sendBufferSize = sendBufferSize; - _receiveBufferSize = receiveBufferSize; - _needClientAuth = needClientAuth; - _wantClientAuth = wantClientAuth; - } - - @Override - public boolean wantClientAuth() - { - return _wantClientAuth; - } - - @Override - public boolean needClientAuth() - { - return _needClientAuth; - } - - @Override - public Boolean getTcpNoDelay() - { - return _tcpNoDelay; - } - - @Override - public Integer getSendBufferSize() - { - return _sendBufferSize; - } - - @Override - public Integer getReceiveBufferSize() - { - return _receiveBufferSize; - } - - @Override - public InetSocketAddress getAddress() - { - return _bindingSocketAddress; - } - }; - public String toString() { return getName(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java new file mode 100644 index 0000000000..81a6adc40e --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.transport.TransportProvider; + +import java.util.Set; + +public interface TransportProviderFactory extends Pluggable +{ + Set<Set<Transport>> getSupportedTransports(); + + TransportProvider getTransportProvider(Set<Transport> transports); + + +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 47b578c4ef..b2b585f692 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.security.SSLStatus; @@ -274,9 +275,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void received(ByteBuffer msg) { - _lastReadTime = System.currentTimeMillis(); - ByteBuffer msgheader = msg.duplicate(); + ByteBuffer msgheader = msg.duplicate().slice(); + if(_header.remaining() > msgheader.limit()) { msg.position(msg.limit()); @@ -329,6 +330,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } } + if(newDelegate == null && looksLikeSSL(headerBytes)) { if(_sslContext != null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/AcceptingTransport.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/AcceptingTransport.java new file mode 100644 index 0000000000..bf7f2835e0 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/AcceptingTransport.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +public interface AcceptingTransport +{ + public void start(); + public void close(); +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java new file mode 100644 index 0000000000..9aef42e6d0 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.network.IncomingNetworkTransport; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.Set; + +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + +class TCPandSSLTransport implements AcceptingTransport +{ + private IncomingNetworkTransport _networkTransport; + private Set<Transport> _transports; + private SSLContext _sslContext; + private InetSocketAddress _bindingSocketAddress; + private Port _port; + private Set<AmqpProtocolVersion> _supported; + private AmqpProtocolVersion _defaultSupportedProtocolReply; + + TCPandSSLTransport(final Set<Transport> transports, + final SSLContext sslContext, + final Port port, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + _transports = transports; + _sslContext = sslContext; + _port = port; + _supported = supported; + _defaultSupportedProtocolReply = defaultSupportedProtocolReply; + } + + @Override + public void start() + { + String bindingAddress = (String) _port.getAttribute(Port.BINDING_ADDRESS); + if (WILDCARD_ADDRESS.equals(bindingAddress)) + { + bindingAddress = null; + } + Integer port = (Integer) _port.getAttribute(Port.PORT); + if ( bindingAddress == null ) + { + _bindingSocketAddress = new InetSocketAddress(port); + } + else + { + _bindingSocketAddress = new InetSocketAddress(bindingAddress, port); + } + + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(); + _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance(); + final MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory( + _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null, + settings.wantClientAuth(), settings.needClientAuth(), + _supported, + _defaultSupportedProtocolReply, + _port, + _transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL); + + _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext); + } + + @Override + public void close() + { + _networkTransport.close(); + } + + class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration + { + public ServerNetworkTransportConfiguration() + { + } + + @Override + public boolean wantClientAuth() + { + return (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH); + } + + @Override + public boolean needClientAuth() + { + return (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH); + } + + @Override + public Boolean getTcpNoDelay() + { + return (Boolean)_port.getAttribute(Port.TCP_NO_DELAY); + } + + @Override + public Integer getSendBufferSize() + { + return (Integer)_port.getAttribute(Port.SEND_BUFFER_SIZE); + } + + @Override + public Integer getReceiveBufferSize() + { + return (Integer)_port.getAttribute(Port.RECEIVE_BUFFER_SIZE); + } + + @Override + public InetSocketAddress getAddress() + { + return _bindingSocketAddress; + } + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java new file mode 100644 index 0000000000..33c7774a29 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProvider.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; + +import javax.net.ssl.SSLContext; +import java.util.Set; + +class TCPandSSLTransportProvider implements TransportProvider +{ + @Override + public AcceptingTransport createTransport(final Set<Transport> transports, + final SSLContext sslContext, + final Port port, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + return new TCPandSSLTransport(transports, sslContext, port, supported, defaultSupportedProtocolReply); + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProviderFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProviderFactory.java new file mode 100644 index 0000000000..9b61d1d037 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransportProviderFactory.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.plugin.TransportProviderFactory; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; + +public class TCPandSSLTransportProviderFactory implements TransportProviderFactory +{ + + private static final String TYPE = "TCPandSSL"; + + @Override + public Set<Set<Transport>> getSupportedTransports() + { + return new HashSet<Set<Transport>>(Arrays.asList(EnumSet.of(Transport.TCP), + EnumSet.of(Transport.SSL), + EnumSet.of(Transport.TCP,Transport.SSL))); + } + + @Override + public TransportProvider getTransportProvider(final Set<Transport> transports) + { + return new TCPandSSLTransportProvider(); + } + + @Override + public String getType() + { + return TYPE; + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/transport/TransportProvider.java b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TransportProvider.java new file mode 100644 index 0000000000..86b1e31727 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/transport/TransportProvider.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.Set; + +public interface TransportProvider +{ + AcceptingTransport createTransport(Set<Transport> transports, + SSLContext sslContext, + Port port, + Set<AmqpProtocolVersion> supported, + AmqpProtocolVersion defaultSupportedProtocolReply); +} diff --git a/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory new file mode 100644 index 0000000000..3838a9c39f --- /dev/null +++ b/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.transport.TCPandSSLTransportProviderFactory
\ No newline at end of file diff --git a/java/broker-plugins/websocket/build.xml b/java/broker-plugins/websocket/build.xml new file mode 100644 index 0000000000..fc3dd3b846 --- /dev/null +++ b/java/broker-plugins/websocket/build.xml @@ -0,0 +1,32 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> +<project name="Qpid Broker-Plugins Websocket Transport" default="build"> + <property name="module.depends" value="common broker-core" /> + <property name="module.test.depends" value="qpid-test-utils broker-core/tests" /> + + <property name="module.genpom" value="true"/> + <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker-core=provided"/> + + <property name="broker.plugin" value="true"/> + <property name="broker-plugins-websocket.libs" value="" /> + + <import file="../../module.xml" /> + + <target name="bundle" depends="bundle-tasks"/> +</project> diff --git a/java/broker-plugins/websocket/pom.xml b/java/broker-plugins/websocket/pom.xml new file mode 100644 index 0000000000..2029bd33aa --- /dev/null +++ b/java/broker-plugins/websocket/pom.xml @@ -0,0 +1,158 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>qpid-project</artifactId> + <groupId>org.apache.qpid</groupId> + <version>0.26-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>qpid-broker-plugins-websocket</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-core</artifactId> + <version>0.26-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-servlet_2.5_spec</artifactId> + <version>1.2</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-continuation</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-continuation</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-security</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-security</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-websocket</artifactId> + <version>7.6.10.v20130312</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-io</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + </build> + +</project>
\ No newline at end of file diff --git a/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java new file mode 100644 index 0000000000..b44ed70040 --- /dev/null +++ b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -0,0 +1,294 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; +import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketHandler; + +import javax.net.ssl.SSLContext; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.Set; + +class WebSocketProvider implements AcceptingTransport +{ + public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10"; + private final Transport _transport; + private final SSLContext _sslContext; + private final Port _port; + private final Set<AmqpProtocolVersion> _supported; + private final AmqpProtocolVersion _defaultSupportedProtocolReply; + private final ProtocolEngineFactory _factory; + private Server _server; + + WebSocketProvider(final Transport transport, + final SSLContext sslContext, + final Port port, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + _transport = transport; + _sslContext = sslContext; + _port = port; + _supported = supported; + _defaultSupportedProtocolReply = defaultSupportedProtocolReply; + _factory = new MultiVersionProtocolEngineFactory( + _port.getParent(Broker.class), null, + (Boolean)_port.getAttribute(Port.WANT_CLIENT_AUTH), + (Boolean)_port.getAttribute(Port.NEED_CLIENT_AUTH), + _supported, + _defaultSupportedProtocolReply, + _port, + _transport); + + } + + @Override + public void start() + { + _server = new Server(); + + Connector connector = null; + + + if (_transport == Transport.WS) + { + connector = new SelectChannelConnector(); + } + else if (_transport == Transport.WSS) + { + SslContextFactory factory = new SslContextFactory(); + factory.setSslContext(_sslContext); + connector = new SslSocketConnector(factory); + } + else + { + throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport); + } + String bindingAddress = _port.getBindingAddress(); + if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*")) + { + connector.setHost(bindingAddress.trim()); + } + connector.setPort(_port.getPort()); + _server.addConnector(connector); + + WebSocketHandler wshandler = new WebSocketHandler() + { + @Override + public WebSocket doWebSocketConnect(final HttpServletRequest request, final String protocol) + { + SocketAddress remoteAddress = new InetSocketAddress(request.getRemoteHost(), request.getRemotePort()); + SocketAddress localAddress = new InetSocketAddress(request.getLocalName(), request.getLocalPort()); + return AMQP_WEBSOCKET_SUBPROTOCOL.equals(protocol) ? new AmqpWebSocket(_transport, localAddress, remoteAddress) : null; + } + }; + + _server.setHandler(wshandler); + try + { + _server.start(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + } + + @Override + public void close() + { + + } + + private class AmqpWebSocket implements WebSocket,WebSocket.OnBinaryMessage + { + private final SocketAddress _localAddress; + private final SocketAddress _remoteAddress; + private Connection _connection; + private final Transport _transport; + private ProtocolEngine _engine; + + private AmqpWebSocket(final Transport transport, + final SocketAddress localAddress, + final SocketAddress remoteAddress) + { + _transport = transport; + _localAddress = localAddress; + _remoteAddress = remoteAddress; + } + + @Override + public void onMessage(final byte[] data, final int offset, final int length) + { + _engine.received(ByteBuffer.wrap(data, offset, length).slice()); + } + + @Override + public void onOpen(final Connection connection) + { + _connection = connection; + + _engine = _factory.newProtocolEngine(); + + final NetworkConnection connectionWrapper = new ConnectionWrapper(connection, _localAddress, _remoteAddress); + _engine.setNetworkConnection(connectionWrapper, connectionWrapper.getSender()); + + } + + @Override + public void onClose(final int closeCode, final String message) + { + _engine.closed(); + } + } + + private class ConnectionWrapper implements NetworkConnection, Sender<ByteBuffer> + { + private final WebSocket.Connection _connection; + private final SocketAddress _localAddress; + private final SocketAddress _remoteAddress; + private Principal _principal; + private int _maxWriteIdle; + private int _maxReadIdle; + + public ConnectionWrapper(final WebSocket.Connection connection, + final SocketAddress localAddress, + final SocketAddress remoteAddress) + { + _connection = connection; + _localAddress = localAddress; + _remoteAddress = remoteAddress; + } + + @Override + public Sender<ByteBuffer> getSender() + { + return this; + } + + @Override + public void start() + { + + } + + @Override + public void setIdleTimeout(final int i) + { + + } + + @Override + public void send(final ByteBuffer msg) + { + try + { + _connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining()); + } + catch (IOException e) + { + close(); + } + } + + @Override + public void flush() + { + + } + + @Override + public void close() + { + _connection.close(); + } + + @Override + public SocketAddress getRemoteAddress() + { + return _remoteAddress; + } + + @Override + public SocketAddress getLocalAddress() + { + return _localAddress; + } + + @Override + public void setMaxWriteIdle(final int sec) + { + _maxWriteIdle = sec; + } + + @Override + public void setMaxReadIdle(final int sec) + { + _maxReadIdle = sec; + } + + @Override + public void setPeerPrincipal(final Principal principal) + { + _principal = principal; + } + + @Override + public Principal getPeerPrincipal() + { + return _principal; + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } + } +} diff --git a/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java new file mode 100644 index 0000000000..02d1100315 --- /dev/null +++ b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProvider.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.server.transport.TransportProvider; + +import javax.net.ssl.SSLContext; +import java.util.Set; + +class WebSocketTransportProvider implements TransportProvider +{ + public WebSocketTransportProvider() + { + } + + @Override + public AcceptingTransport createTransport(final Set<Transport> transports, + final SSLContext sslContext, + final Port port, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedProtocolReply) + { + return new WebSocketProvider(transports.iterator().next(), + sslContext, + port, + supported, + defaultSupportedProtocolReply); + } +} diff --git a/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java new file mode 100644 index 0000000000..662f16ce5b --- /dev/null +++ b/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketTransportProviderFactory.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport.websocket; + +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.plugin.TransportProviderFactory; +import org.apache.qpid.server.transport.TransportProvider; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; + +public class WebSocketTransportProviderFactory implements TransportProviderFactory +{ + + private static final String TYPE = "Websocket"; + + @Override + public Set<Set<Transport>> getSupportedTransports() + { + return Collections.singleton((Set<Transport>)EnumSet.of(Transport.WS)); + } + + @Override + public TransportProvider getTransportProvider(final Set<Transport> transports) + { + return new WebSocketTransportProvider(); + } + + @Override + public String getType() + { + return TYPE; + } +} diff --git a/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory b/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory new file mode 100644 index 0000000000..55b88cc7be --- /dev/null +++ b/java/broker-plugins/websocket/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.transport.websocket.WebSocketTransportProviderFactory
\ No newline at end of file diff --git a/java/build.deps b/java/build.deps index 17e637cc97..58dea7009e 100644 --- a/java/build.deps +++ b/java/build.deps @@ -74,6 +74,7 @@ amqp-1-0-common.libs= amqp-1-0-client.libs= amqp-1-0-client-example.libs=${commons-cli} amqp-1-0-client-jms.libs=${geronimo-jms} +amqp-1-0-client-websocket.libs = ${jetty} ${jetty-continuation} ${jetty-security} ${jetty-http} ${jetty-io} ${jetty-servlet} ${jetty-util} ${servlet-api} ${jetty-websocket} tools.libs=${commons-configuration.libs} ${log4j} broker-core.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \ ${xalan} ${derby-db} ${commons-configuration.libs} \ diff --git a/java/build.xml b/java/build.xml index e761677f6d..219029b908 100644 --- a/java/build.xml +++ b/java/build.xml @@ -34,7 +34,7 @@ <findSubProjects name="broker-plugins" dir="broker-plugins" excludes="${broker-plugins-exclude}"/> <findSubProjects name="client-plugins" dir="client-plugins"/> - <property name="modules.core" value="qpid-test-utils common management/common amqp-1-0-common broker-core broker client amqp-1-0-client amqp-1-0-client-jms tools"/> + <property name="modules.core" value="qpid-test-utils common management/common amqp-1-0-common broker-core broker client amqp-1-0-client amqp-1-0-client-jms amqp-1-0-client-websocket tools"/> <property name="modules.examples" value="client/example management/example amqp-1-0-client/example amqp-1-0-client-jms/example"/> <property name="modules.tests" value="systests perftests"/> <property name="modules.plugin" value="${broker-plugins} ${client-plugins}"/> diff --git a/java/ivy.nexus.xml b/java/ivy.nexus.xml index d492169ff3..ee0ec27d48 100644 --- a/java/ivy.nexus.xml +++ b/java/ivy.nexus.xml @@ -135,6 +135,12 @@ <artifact name="qpid-amqp-1-0-client-jms" type="jar.asc" ext="jar.asc"/> <artifact name="qpid-amqp-1-0-client-jms" type="source" ext="jar" e:classifier="sources"/> <artifact name="qpid-amqp-1-0-client-jms" type="source.asc" ext="jar.asc" e:classifier="sources"/> + <artifact name="qpid-amqp-1-0-client-websocket" type="pom" ext="pom"/> + <artifact name="qpid-amqp-1-0-client-websocket" type="pom.asc" ext="pom.asc"/> + <artifact name="qpid-amqp-1-0-client-websocket" type="jar" ext="jar"/> + <artifact name="qpid-amqp-1-0-client-websocket" type="jar.asc" ext="jar.asc"/> + <artifact name="qpid-amqp-1-0-client-websocket" type="source" ext="jar" e:classifier="sources"/> + <artifact name="qpid-amqp-1-0-client-websocket" type="source.asc" ext="jar.asc" e:classifier="sources"/> <artifact name="qpid-management-common" type="pom" ext="pom"/> <artifact name="qpid-management-common" type="pom.asc" ext="pom.asc"/> <artifact name="qpid-management-common" type="jar" ext="jar"/> diff --git a/java/module.xml b/java/module.xml index 28c31f967d..6d3f08a386 100644 --- a/java/module.xml +++ b/java/module.xml @@ -584,8 +584,8 @@ <copylist todir="${build.lib}" dir="${project.root}" files="${module.libs}"/> </target> - - <target name="libs-release" description="copy dependencies into module release"> + + <target name="libs-release-basic" description="copy dependencies into module release"> <!-- Copy the module dependencies --> <echo message="${module.libs}"/> <copylist todir="${module.release}/lib" dir="${project.root}" files="${module.libs}"/> @@ -594,10 +594,18 @@ <!-- Copy the jar for this module --> <copy todir="${module.release}/lib" failonerror="true"> <fileset file="${module.jar}"/> + </copy> + </target> + + <target name="libs-release-module-depends" description="copy dependencies into module release" unless="release.exclude.module.deps"> + <copy todir="${module.release}/lib" failonerror="true"> <fileset dir="${build.lib}" includes="${module.depends.jars}"/> </copy> </target> + <target name="libs-release" description="copy dependencies into module release" depends="libs-release-basic,libs-release-module-depends"> + </target> + <target name="resources" description="copy resources into build tree"> <copy todir="${build}" failonerror="false" flatten="true"> <fileset dir="${basedir}${file.separator}.." includes="${resources}"/> diff --git a/java/pom.xml b/java/pom.xml index 3fd4f4a0ab..1b313b3514 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -137,6 +137,7 @@ <modules> <module>amqp-1-0-client</module> <module>amqp-1-0-client-jms</module> + <module>amqp-1-0-client-websocket</module> <module>amqp-1-0-common</module> <module>broker</module> <module>broker-core</module> @@ -153,6 +154,7 @@ <module>broker-plugins/management-http</module> <module>broker-plugins/management-jmx</module> <module>broker-plugins/memory-store</module> + <module>broker-plugins/websocket</module> <module>common</module> <module>client</module> <module>management/common</module> |