diff options
Diffstat (limited to 'trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java')
-rw-r--r-- | trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java | 486 |
1 files changed, 0 insertions, 486 deletions
diff --git a/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java deleted file mode 100644 index b1612840db..0000000000 --- a/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoConnectorConfig; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.AbstractIoFilterChain; -import org.apache.mina.common.support.DefaultConnectFuture; -import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.Queue; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; - -/** - * {@link IoConnector} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ - */ -public class MultiThreadSocketConnector extends SocketConnector -{ - /** @noinspection StaticNonFinalField */ - private static volatile int nextId = 0; - - private final Object lock = new Object(); - private final int id = nextId++; - private final String threadName = "SocketConnector-" + id; - private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); - private final Queue connectQueue = new Queue(); - private final MultiThreadSocketIoProcessor[] ioProcessors; - private final int processorCount; - private final Executor executor; - - /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ - private Selector selector; - private Worker worker; - private int processorDistributor = 0; - private int workerTimeout = 60; // 1 min. - - /** Create a connector with a single processing thread using a NewThreadExecutor */ - public MultiThreadSocketConnector() - { - this(1, new NewThreadExecutor()); - } - - /** - * Create a connector with the desired number of processing threads - * - * @param processorCount Number of processing threads - * @param executor Executor to use for launching threads - */ - public MultiThreadSocketConnector(int processorCount, Executor executor) - { - if (processorCount < 1) - { - throw new IllegalArgumentException("Must have at least one processor"); - } - - this.executor = executor; - this.processorCount = processorCount; - ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; - - for (int i = 0; i < processorCount; i++) - { - ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); - } - } - - /** - * How many seconds to keep the connection thread alive between connection requests - * - * @return Number of seconds to keep connection thread alive - */ - public int getWorkerTimeout() - { - return workerTimeout; - } - - /** - * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. - * - * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 - */ - public void setWorkerTimeout(int workerTimeout) - { - if (workerTimeout < 0) - { - throw new IllegalArgumentException("Must be >= 0"); - } - this.workerTimeout = workerTimeout; - } - - public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) - { - return connect(address, null, handler, config); - } - - public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoServiceConfig config) - { - if (address == null) - { - throw new NullPointerException("address"); - } - if (handler == null) - { - throw new NullPointerException("handler"); - } - - if (!(address instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected address type: " - + address.getClass()); - } - - if (localAddress != null && !(localAddress instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected local address type: " - + localAddress.getClass()); - } - - if (config == null) - { - config = getDefaultConfig(); - } - - SocketChannel ch = null; - boolean success = false; - try - { - ch = SocketChannel.open(); - ch.socket().setReuseAddress(true); - if (localAddress != null) - { - ch.socket().bind(localAddress); - } - - ch.configureBlocking(false); - - if (ch.connect(address)) - { - DefaultConnectFuture future = new DefaultConnectFuture(); - newSession(ch, handler, config, future); - success = true; - return future; - } - - success = true; - } - catch (IOException e) - { - return DefaultConnectFuture.newFailedFuture(e); - } - finally - { - if (!success && ch != null) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - - ConnectionRequest request = new ConnectionRequest(ch, handler, config); - synchronized (lock) - { - try - { - startupWorker(); - } - catch (IOException e) - { - try - { - ch.close(); - } - catch (IOException e2) - { - ExceptionMonitor.getInstance().exceptionCaught(e2); - } - - return DefaultConnectFuture.newFailedFuture(e); - } - } - - synchronized (connectQueue) - { - connectQueue.push(request); - } - selector.wakeup(); - - return request; - } - - private synchronized void startupWorker() throws IOException - { - if (worker == null) - { - selector = Selector.open(); - worker = new Worker(); - executor.execute(new NamePreservingRunnable(worker)); - } - } - - private void registerNew() - { - if (connectQueue.isEmpty()) - { - return; - } - - for (; ;) - { - ConnectionRequest req; - synchronized (connectQueue) - { - req = (ConnectionRequest) connectQueue.pop(); - } - - if (req == null) - { - break; - } - - SocketChannel ch = req.channel; - try - { - ch.register(selector, SelectionKey.OP_CONNECT, req); - } - catch (IOException e) - { - req.setException(e); - } - } - } - - private void processSessions(Set keys) - { - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isConnectable()) - { - continue; - } - - SocketChannel ch = (SocketChannel) key.channel(); - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - boolean success = false; - try - { - ch.finishConnect(); - newSession(ch, entry.handler, entry.config, entry); - success = true; - } - catch (Throwable e) - { - entry.setException(e); - } - finally - { - key.cancel(); - if (!success) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - keys.clear(); - } - - private void processTimedOutSessions(Set keys) - { - long currentTime = System.currentTimeMillis(); - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isValid()) - { - continue; - } - - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - if (currentTime >= entry.deadline) - { - entry.setException(new ConnectException()); - try - { - key.channel().close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - key.cancel(); - } - } - } - } - - private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - { - MultiThreadSocketSessionImpl session = - new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(), - config, ch, handler, ch.socket().getRemoteSocketAddress()); - - //new interface -// SocketSessionImpl session = new SocketSessionImpl( -// this, nextProcessor(), getListeners(), -// config, ch, handler, ch.socket().getRemoteSocketAddress() ); - try - { - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getThreadModel().buildFilterChain(session.getFilterChain()); - } - catch (Throwable e) - { - throw (IOException) new IOException("Failed to create a session.").initCause(e); - } - - // Set the ConnectFuture of the specified session, which will be - // removed and notified by AbstractIoFilterChain eventually. - session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); - - // Forward the remaining process to the SocketIoProcessor. - session.getIoProcessor().addNew(session); - } - - private MultiThreadSocketIoProcessor nextProcessor() - { - return ioProcessors[processorDistributor++ % processorCount]; - } - - private class Worker implements Runnable - { - private long lastActive = System.currentTimeMillis(); - - public void run() - { - Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName); - - for (; ;) - { - try - { - int nKeys = selector.select(1000); - - registerNew(); - - if (nKeys > 0) - { - processSessions(selector.selectedKeys()); - } - - processTimedOutSessions(selector.keys()); - - if (selector.keys().isEmpty()) - { - if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) - { - synchronized (lock) - { - if (selector.keys().isEmpty() && - connectQueue.isEmpty()) - { - worker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - break; - } - } - } - } - else - { - lastActive = System.currentTimeMillis(); - } - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - } - } - - private class ConnectionRequest extends DefaultConnectFuture - { - private final SocketChannel channel; - private final long deadline; - private final IoHandler handler; - private final IoServiceConfig config; - - private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) - { - this.channel = channel; - long timeout; - if (config instanceof IoConnectorConfig) - { - timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); - } - else - { - timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); - } - this.deadline = System.currentTimeMillis() + timeout; - this.handler = handler; - this.config = config; - } - } -} |