diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java | 547 |
1 files changed, 0 insertions, 547 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java deleted file mode 100644 index e5360d32e0..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java +++ /dev/null @@ -1,547 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.BaseIoAcceptor; -import org.apache.mina.util.Queue; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.NamePreservingRunnable; -import edu.emory.mathcs.backport.java.util.concurrent.Executor; - -/** - * {@link IoAcceptor} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ - */ -public class MultiThreadSocketAcceptor extends SocketAcceptor -{ - /** - * @noinspection StaticNonFinalField - */ - private static volatile int nextId = 0; - - private final Executor executor; - private final Object lock = new Object(); - private final int id = nextId ++; - private final String threadName = "SocketAcceptor-" + id; - private final Map channels = new HashMap(); - - private final Queue registerQueue = new Queue(); - private final Queue cancelQueue = new Queue(); - - private final MultiThreadSocketIoProcessor[] ioProcessors; - private final int processorCount; - - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ - private Selector selector; - private Worker worker; - private int processorDistributor = 0; - - /** - * Create an acceptor with a single processing thread using a NewThreadExecutor - */ - public MultiThreadSocketAcceptor() - { - this( 1, new NewThreadExecutor() ); - } - - /** - * Create an acceptor with the desired number of processing threads - * - * @param processorCount Number of processing threads - * @param executor Executor to use for launching threads - */ - public MultiThreadSocketAcceptor( int processorCount, Executor executor ) - { - if( processorCount < 1 ) - { - throw new IllegalArgumentException( "Must have at least one processor" ); - } - - this.executor = executor; - this.processorCount = processorCount; - ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; - - for( int i = 0; i < processorCount; i++ ) - { - ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor ); - } - } - - - /** - * Binds to the specified <code>address</code> and handles incoming connections with the specified - * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. - * - * @throws IOException if failed to bind - */ - public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException - { - if( handler == null ) - { - throw new NullPointerException( "handler" ); - } - - if( address != null && !( address instanceof InetSocketAddress ) ) - { - throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); - } - - if( config == null ) - { - config = getDefaultConfig(); - } - - RegistrationRequest request = new RegistrationRequest( address, handler, config ); - - synchronized( registerQueue ) - { - registerQueue.push( request ); - } - - startupWorker(); - - selector.wakeup(); - - synchronized( request ) - { - while( !request.done ) - { - try - { - request.wait(); - } - catch( InterruptedException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - } - } - - if( request.exception != null ) - { - throw request.exception; - } - } - - - private synchronized void startupWorker() throws IOException - { - synchronized( lock ) - { - if( worker == null ) - { - selector = Selector.open(); - worker = new Worker(); - - executor.execute( new NamePreservingRunnable( worker ) ); - } - } - } - - public void unbind( SocketAddress address ) - { - if( address == null ) - { - throw new NullPointerException( "address" ); - } - - CancellationRequest request = new CancellationRequest( address ); - - try - { - startupWorker(); - } - catch( IOException e ) - { - // IOException is thrown only when Worker thread is not - // running and failed to open a selector. We simply throw - // IllegalArgumentException here because we can simply - // conclude that nothing is bound to the selector. - throw new IllegalArgumentException( "Address not bound: " + address ); - } - - synchronized( cancelQueue ) - { - cancelQueue.push( request ); - } - - selector.wakeup(); - - synchronized( request ) - { - while( !request.done ) - { - try - { - request.wait(); - } - catch( InterruptedException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - } - } - - if( request.exception != null ) - { - request.exception.fillInStackTrace(); - - throw request.exception; - } - } - - - private class Worker implements Runnable - { - public void run() - { - Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName ); - - for( ; ; ) - { - try - { - int nKeys = selector.select(); - - registerNew(); - - if( nKeys > 0 ) - { - processSessions( selector.selectedKeys() ); - } - - cancelKeys(); - - if( selector.keys().isEmpty() ) - { - synchronized( lock ) - { - if( selector.keys().isEmpty() && - registerQueue.isEmpty() && - cancelQueue.isEmpty() ) - { - worker = null; - try - { - selector.close(); - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - - try - { - Thread.sleep( 1000 ); - } - catch( InterruptedException e1 ) - { - ExceptionMonitor.getInstance().exceptionCaught( e1 ); - } - } - } - } - - private void processSessions( Set keys ) throws IOException - { - Iterator it = keys.iterator(); - while( it.hasNext() ) - { - SelectionKey key = ( SelectionKey ) it.next(); - - it.remove(); - - if( !key.isAcceptable() ) - { - continue; - } - - ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); - - SocketChannel ch = ssc.accept(); - - if( ch == null ) - { - continue; - } - - boolean success = false; - try - { - - RegistrationRequest req = ( RegistrationRequest ) key.attachment(); - - MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl( - MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(), - req.config, ch, req.handler, req.address ); - - // New Interface -// SocketSessionImpl session = new SocketSessionImpl( -// SocketAcceptor.this, nextProcessor(), getListeners(), -// req.config, ch, req.handler, req.address ); - - - getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); - req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); - req.config.getThreadModel().buildFilterChain( session.getFilterChain() ); - session.getIoProcessor().addNew( session ); - success = true; - } - catch( Throwable t ) - { - ExceptionMonitor.getInstance().exceptionCaught( t ); - } - finally - { - if( !success ) - { - ch.close(); - } - } - } - } - } - - private MultiThreadSocketIoProcessor nextProcessor() - { - return ioProcessors[processorDistributor++ % processorCount]; - } - - - private void registerNew() - { - if( registerQueue.isEmpty() ) - { - return; - } - - for( ; ; ) - { - RegistrationRequest req; - - synchronized( registerQueue ) - { - req = ( RegistrationRequest ) registerQueue.pop(); - } - - if( req == null ) - { - break; - } - - ServerSocketChannel ssc = null; - - try - { - ssc = ServerSocketChannel.open(); - ssc.configureBlocking( false ); - - // Configure the server socket, - SocketAcceptorConfig cfg; - if( req.config instanceof SocketAcceptorConfig ) - { - cfg = ( SocketAcceptorConfig ) req.config; - } - else - { - cfg = ( SocketAcceptorConfig ) getDefaultConfig(); - } - - ssc.socket().setReuseAddress( cfg.isReuseAddress() ); - ssc.socket().setReceiveBufferSize( - ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() ); - - // and bind. - ssc.socket().bind( req.address, cfg.getBacklog() ); - if( req.address == null || req.address.getPort() == 0 ) - { - req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress(); - } - ssc.register( selector, SelectionKey.OP_ACCEPT, req ); - - synchronized( channels ) - { - channels.put( req.address, ssc ); - } - - getListeners().fireServiceActivated( - this, req.address, req.handler, req.config ); - } - catch( IOException e ) - { - req.exception = e; - } - finally - { - synchronized( req ) - { - req.done = true; - - req.notifyAll(); - } - - if( ssc != null && req.exception != null ) - { - try - { - ssc.close(); - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - } - } - } - } - - - private void cancelKeys() - { - if( cancelQueue.isEmpty() ) - { - return; - } - - for( ; ; ) - { - CancellationRequest request; - - synchronized( cancelQueue ) - { - request = ( CancellationRequest ) cancelQueue.pop(); - } - - if( request == null ) - { - break; - } - - ServerSocketChannel ssc; - synchronized( channels ) - { - ssc = ( ServerSocketChannel ) channels.remove( request.address ); - } - - // close the channel - try - { - if( ssc == null ) - { - request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); - } - else - { - SelectionKey key = ssc.keyFor( selector ); - request.registrationRequest = ( RegistrationRequest ) key.attachment(); - key.cancel(); - - selector.wakeup(); // wake up again to trigger thread death - - ssc.close(); - } - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - finally - { - synchronized( request ) - { - request.done = true; - request.notifyAll(); - } - - if( request.exception == null ) - { - getListeners().fireServiceDeactivated( - this, request.address, - request.registrationRequest.handler, - request.registrationRequest.config ); - } - } - } - } - - private static class RegistrationRequest - { - private InetSocketAddress address; - private final IoHandler handler; - private final IoServiceConfig config; - private IOException exception; - private boolean done; - - private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config ) - { - this.address = ( InetSocketAddress ) address; - this.handler = handler; - this.config = config; - } - } - - - private static class CancellationRequest - { - private final SocketAddress address; - private boolean done; - private RegistrationRequest registrationRequest; - private RuntimeException exception; - - private CancellationRequest( SocketAddress address ) - { - this.address = address; - } - } -} |