diff options
11 files changed, 575 insertions, 52 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 8c2da9d77a..2ca5d28f42 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -25,14 +25,7 @@ import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoTransport; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; -import org.apache.qpid.util.Strings; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslServer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -40,6 +33,14 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslServer; + +import org.apache.qpid.transport.network.security.SecurityLayer; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; +import org.apache.qpid.util.Strings; + /** * Connection @@ -109,7 +110,8 @@ public class Connection extends ConnectionInvoker private Map<String,Object> _serverProperties; private String userID; private ConnectionSettings conSettings; - + private SecurityLayer securityLayer; + // want to make this final private int _connectionId; @@ -215,10 +217,17 @@ public class Connection extends ConnectionInvoker userID = settings.getUsername(); delegate = new ClientDelegate(settings); - IoTransport.connect(settings.getHost(), + /*IoTransport.connect(settings.getHost(), settings.getPort(), ConnectionBinding.get(this), - settings.isUseSSL()); + settings.isUseSSL());*/ + + TransportBuilder transport = new TransportBuilder(); + transport.init(this); + this.sender = transport.buildSenderPipe(); + transport.buildReceiverPipe(this); + this.securityLayer = transport.getSecurityLayer(); + send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -633,5 +642,10 @@ public class Connection extends ConnectionInvoker { return conSettings; } + + public SecurityLayer getSecurityLayer() + { + return securityLayer; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index aca53d9a6a..08678b213b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -36,47 +36,32 @@ public class ConnectionSettings String username = "guest"; String password = "guest"; int port = 5672; - boolean tcpNodelay; + boolean tcpNodelay = Boolean.getBoolean("amqj.tcp_nodelay"); int maxChannelCount = 32767; int maxFrameSize = 65535; int heartbeatInterval; + int readBufferSize = 65535; + int writeBufferSize = 65535; + long transportTimeout = 60000; // SSL props boolean useSSL; - String keyStorePath; - String keyStorePassword; - String keyStoreCertType; - String trustStoreCertType; - String trustStorePath; - String trustStorePassword; + String keyStorePath = System.getProperty("javax.net.ssl.keyStore"); + String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword"); + String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");; + String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");; + String trustStorePath = System.getProperty("javax.net.ssl.trustStore");; + String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");; String certAlias; boolean verifyHostname; // SASL props - String saslMechs = "PLAIN"; - String saslProtocol = "AMQP"; - String saslServerName = "localhost"; + String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN"); + String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP"); + String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost"); boolean useSASLEncryption; - - private Connection owner; - + private Map<String, Object> _clientProperties; - - public Connection getConnection() - { - return owner; - } - - public void setConnection(Connection owner) - { - if (this.owner != null) - { - throw new IllegalStateException( - "A ConnectionSettings instance can be associated" + - " with one and only one Connection instance"); - } - this.owner = owner; - } public boolean isTcpNodelay() { @@ -318,4 +303,34 @@ public class ConnectionSettings this.trustStoreCertType = trustStoreCertType; } + public int getReadBufferSize() + { + return readBufferSize; + } + + public void setReadBufferSize(int readBufferSize) + { + this.readBufferSize = readBufferSize; + } + + public int getWriteBufferSize() + { + return writeBufferSize; + } + + public void setWriteBufferSize(int writeBufferSize) + { + this.writeBufferSize = writeBufferSize; + } + + public long getTransportTimeout() + { + return transportTimeout; + } + + public void setTransportTimeout(long transportTimeout) + { + this.transportTimeout = transportTimeout; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java b/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java new file mode 100644 index 0000000000..d1ae95a3bb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.security.SecurityLayer; + +public class TransportBuilder +{ + private Connection con; + private ConnectionSettings settings; + private NetworkTransport transport; + private SecurityLayer securityLayer = new SecurityLayer(); + + public void init(Connection con) throws TransportException + { + this.con = con; + this.settings = con.getConnectionSettings(); + transport = Transport.getTransport(); + transport.init(settings); + securityLayer.init(con); + } + + public Sender<ProtocolEvent> buildSenderPipe() + { + ConnectionSettings settings = con.getConnectionSettings(); + + // Io layer + Sender<ByteBuffer> sender = transport.sender(); + + // Security layer + sender = securityLayer.sender(sender); + + Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize()); + return dis; + } + + public void buildReceiverPipe(Receiver<ProtocolEvent> delegate) + { + ConnectionSettings settings = con.getConnectionSettings(); + + Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate)); + + // Security layer + receiver = securityLayer.receiver(receiver); + + //Io layer + transport.receiver(receiver); + } + + public SecurityLayer getSecurityLayer() + { + return securityLayer; + } + +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java new file mode 100644 index 0000000000..5e12d7e7c6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ConnectionSettings; + +public interface NetworkTransport +{ + public void init(ConnectionSettings settings); + + public Sender<ByteBuffer> sender(); + + public void receiver(Receiver<ByteBuffer> delegate); + + public void close(); +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java new file mode 100644 index 0000000000..f0bf04d04f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network; + +import org.apache.qpid.transport.TransportException; + +public class Transport +{ + private final static Class<?> transportClass; + + static + { + try + { + transportClass = + Class.forName(System.getProperty("qpid.transport", + "org.apache.qpid.transport.network.io.IoNetworkTransport")); + + } + catch(Exception e) + { + throw new Error("Error occured while loading Qpid Transport",e); + } + } + + public static NetworkTransport getTransport() throws TransportException + { + try + { + return (NetworkTransport)transportClass.newInstance(); + } + catch (Exception e) + { + throw new TransportException("Error while creating a new transport instance",e); + } + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java new file mode 100644 index 0000000000..72520c64ef --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java @@ -0,0 +1,15 @@ +package org.apache.qpid.transport.network.io; + +import java.net.Socket; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Sender; + +public interface IoContext +{ + Sender<ByteBuffer> getSender(); + + IoReceiver getReceiver(); + + Socket getSocket(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java new file mode 100644 index 0000000000..4e6d2130ae --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -0,0 +1,120 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.util.Logger; + +public class IoNetworkTransport implements NetworkTransport, IoContext +{ + static + { + org.apache.mina.common.ByteBuffer.setAllocator + (new org.apache.mina.common.SimpleByteBufferAllocator()); + org.apache.mina.common.ByteBuffer.setUseDirectBuffers + (Boolean.getBoolean("amqj.enableDirectBuffers")); + } + + private static final Logger log = Logger.get(IoNetworkTransport.class); + + private Socket socket; + private Sender<ByteBuffer> sender; + private IoReceiver receiver; + private long timeout = 60000; + private ConnectionSettings settings; + + @Override + public void init(ConnectionSettings settings) + { + try + { + this.settings = settings; + InetAddress address = InetAddress.getByName(settings.getHost()); + socket = new Socket(); + socket.setReuseAddress(true); + socket.setTcpNoDelay(settings.isTcpNodelay()); + + log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.setSendBufferSize(settings.getWriteBufferSize()); + socket.setReceiveBufferSize(settings.getReadBufferSize()); + + log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.connect(new InetSocketAddress(address, settings.getPort())); + } + catch (SocketException e) + { + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + } + + @Override + public void receiver(Receiver<ByteBuffer> delegate) + { + receiver = new IoReceiver(this, delegate, + 2*settings.getReadBufferSize() , timeout); + } + + @Override + public Sender<ByteBuffer> sender() + { + return new IoSender(this, 2*settings.getWriteBufferSize(), timeout); + } + + @Override + public void close() + { + + } + + public Sender<ByteBuffer> getSender() + { + return sender; + } + + public IoReceiver getReceiver() + { + return receiver; + } + + public Socket getSocket() + { + return socket; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index e0e06d22ec..19a683d505 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -42,7 +42,7 @@ final class IoReceiver implements Runnable private static final Logger log = Logger.get(IoReceiver.class); - private final IoTransport transport; + private final IoContext ioCtx; private final Receiver<ByteBuffer> receiver; private final int bufferSize; private final Socket socket; @@ -52,13 +52,13 @@ final class IoReceiver implements Runnable private final boolean shutdownBroken = ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); - public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, + public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) { - this.transport = transport; + this.ioCtx = ioCtx; this.receiver = receiver; this.bufferSize = bufferSize; - this.socket = transport.getSocket(); + this.socket = ioCtx.getSocket(); this.timeout = timeout; try diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 383fd6131a..66b97e8225 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -43,7 +43,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> // we can test other cases as well private final static int START = Integer.MAX_VALUE - 10; - private final IoTransport transport; + private final IoContext ioCtx; private final long timeout; private final Socket socket; private final OutputStream out; @@ -60,10 +60,10 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private volatile Throwable exception = null; - public IoSender(IoTransport transport, int bufferSize, long timeout) + public IoSender(IoContext ioCtx, int bufferSize, long timeout) { - this.transport = transport; - this.socket = transport.getSocket(); + this.ioCtx = ioCtx; + this.socket = ioCtx.getSocket(); this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; @@ -207,7 +207,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> throw new SenderException("join timed out"); } } - transport.getReceiver().close(false); + ioCtx.getReceiver().close(false); } catch (InterruptedException e) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 06d53aaeee..bfdbb34978 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -51,7 +51,7 @@ import org.apache.qpid.transport.util.Logger; * SO_RCVBUF - amqj.receiveBufferSize * SO_SNDBUF - amqj.sendBufferSize */ -public final class IoTransport<E> +public final class IoTransport<E> implements IoContext { static @@ -119,17 +119,17 @@ public final class IoTransport<E> } } - Sender<ByteBuffer> getSender() + public Sender<ByteBuffer> getSender() { return sender; } - IoReceiver getReceiver() + public IoReceiver getReceiver() { return receiver; } - Socket getSocket() + public Socket getSocket() { return socket; } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java new file mode 100644 index 0000000000..bb877d4185 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -0,0 +1,185 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.security; + +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionListener; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.security.sasl.SASLReceiver; +import org.apache.qpid.transport.network.security.sasl.SASLSender; +import org.apache.qpid.transport.network.security.ssl.SSLReceiver; +import org.apache.qpid.transport.network.security.ssl.SSLSender; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; + +public class SecurityLayer +{ + ConnectionSettings settings; + Connection con; + SSLSecurityLayer sslLayer; + SASLSecurityLayer saslLayer; + + public void init(Connection con) throws TransportException + { + this.con = con; + this.settings = con.getConnectionSettings(); + if (settings.isUseSSL()) + { + sslLayer = new SSLSecurityLayer(); + } + if (settings.isUseSASLEncryption()) + { + saslLayer = new SASLSecurityLayer(); + } + + } + + public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) + { + Sender<ByteBuffer> sender = delegate; + + if (settings.isUseSSL()) + { + sender = sslLayer.sender(sender); + } + + if (settings.isUseSASLEncryption()) + { + sender = saslLayer.sender(sender); + } + + return sender; + } + + public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate) + { + Receiver<ByteBuffer> receiver = delegate; + + if (settings.isUseSSL()) + { + receiver = sslLayer.receiver(receiver); + } + + if (settings.isUseSASLEncryption()) + { + receiver = saslLayer.receiver(receiver); + } + + return receiver; + } + + public String getUserID() + { + if (settings.isUseSSL()) + { + return sslLayer.getUserID(); + } + else + { + return null; + } + } + + class SSLSecurityLayer + { + SSLEngine engine; + SSLSender sender; + + public SSLSecurityLayer() + { + SSLContext sslCtx; + try + { + sslCtx = SSLUtil.createSSLContext(settings); + } + catch (Exception e) + { + throw new TransportException("Error creating SSL Context", e); + } + + try + { + engine = sslCtx.createSSLEngine(); + engine.setUseClientMode(true); + } + catch(Exception e) + { + throw new TransportException("Error creating SSL Engine", e); + } + } + + public SSLSender sender(Sender<ByteBuffer> delegate) + { + sender = new SSLSender(engine,delegate); + sender.setConnectionSettings(settings); + return sender; + } + + public SSLReceiver receiver(Receiver<ByteBuffer> delegate) + { + if (sender == null) + { + throw new + IllegalStateException("SecurityLayer.sender method should be " + + "invoked before SecurityLayer.receiver"); + } + + SSLReceiver receiver = new SSLReceiver(engine,delegate,sender); + receiver.setConnectionSettings(settings); + return receiver; + } + + public String getUserID() + { + return null; + } + + } + + class SASLSecurityLayer + { + public SASLSecurityLayer() + { + } + + public SASLSender sender(Sender<ByteBuffer> delegate) + { + SASLSender sender = new SASLSender(delegate); + con.addConnectionListener((ConnectionListener)sender); + return sender; + } + + public SASLReceiver receiver(Receiver<ByteBuffer> delegate) + { + SASLReceiver receiver = new SASLReceiver(delegate); + con.addConnectionListener((ConnectionListener)receiver); + return receiver; + } + + } +} |