summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java187
1 files changed, 187 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
new file mode 100644
index 0000000000..79313712a5
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
+import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
+
+public class NonBlockingNetworkTransport
+{
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private SelectorThread _selector;
+
+
+ private Set<TransportEncryption> _encryptionSet;
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocketChannel _serverSocket;
+ private int _timeout;
+
+ public void close()
+ {
+ if(_selector != null)
+ {
+ try
+ {
+ if (_serverSocket != null)
+ {
+ _selector.cancelAcceptingSocket(_serverSocket);
+ _serverSocket.close();
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ finally
+ {
+
+ _selector.close();
+ }
+ }
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
+ {
+ try
+ {
+
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ _serverSocket = ServerSocketChannel.open();
+
+ _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ _serverSocket.bind(address);
+ _serverSocket.configureBlocking(false);
+ _encryptionSet = encryptionSet;
+
+ _selector = new SelectorThread(config.getAddress().toString(), this);
+ _selector.start();
+ _selector.addAcceptingSocket(_serverSocket);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+
+
+ }
+
+ public int getAcceptingPort()
+ {
+ return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
+ }
+
+ public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+ {
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ _config.getEnabledCipherSuites(),
+ _config.getDisabledCipherSuites(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _selector);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ connection.start();
+
+ _selector.addConnection(connection);
+
+ }
+ else
+ {
+ socketChannel.close();
+ }
+ }
+
+
+}