summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java')
-rw-r--r--trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java547
1 files changed, 0 insertions, 547 deletions
diff --git a/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
deleted file mode 100644
index e5360d32e0..0000000000
--- a/trunk/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.NewThreadExecutor;
-import org.apache.mina.util.NamePreservingRunnable;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-
-/**
- * {@link IoAcceptor} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-public class MultiThreadSocketAcceptor extends SocketAcceptor
-{
- /**
- * @noinspection StaticNonFinalField
- */
- private static volatile int nextId = 0;
-
- private final Executor executor;
- private final Object lock = new Object();
- private final int id = nextId ++;
- private final String threadName = "SocketAcceptor-" + id;
- private final Map channels = new HashMap();
-
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
-
- private final MultiThreadSocketIoProcessor[] ioProcessors;
- private final int processorCount;
-
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
- private Selector selector;
- private Worker worker;
- private int processorDistributor = 0;
-
- /**
- * Create an acceptor with a single processing thread using a NewThreadExecutor
- */
- public MultiThreadSocketAcceptor()
- {
- this( 1, new NewThreadExecutor() );
- }
-
- /**
- * Create an acceptor with the desired number of processing threads
- *
- * @param processorCount Number of processing threads
- * @param executor Executor to use for launching threads
- */
- public MultiThreadSocketAcceptor( int processorCount, Executor executor )
- {
- if( processorCount < 1 )
- {
- throw new IllegalArgumentException( "Must have at least one processor" );
- }
-
- this.executor = executor;
- this.processorCount = processorCount;
- ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
-
- for( int i = 0; i < processorCount; i++ )
- {
- ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor );
- }
- }
-
-
- /**
- * Binds to the specified <code>address</code> and handles incoming connections with the specified
- * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
- *
- * @throws IOException if failed to bind
- */
- public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
- {
- if( handler == null )
- {
- throw new NullPointerException( "handler" );
- }
-
- if( address != null && !( address instanceof InetSocketAddress ) )
- {
- throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
- }
-
- if( config == null )
- {
- config = getDefaultConfig();
- }
-
- RegistrationRequest request = new RegistrationRequest( address, handler, config );
-
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
-
- startupWorker();
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
-
- if( request.exception != null )
- {
- throw request.exception;
- }
- }
-
-
- private synchronized void startupWorker() throws IOException
- {
- synchronized( lock )
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
-
- executor.execute( new NamePreservingRunnable( worker ) );
- }
- }
- }
-
- public void unbind( SocketAddress address )
- {
- if( address == null )
- {
- throw new NullPointerException( "address" );
- }
-
- CancellationRequest request = new CancellationRequest( address );
-
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply throw
- // IllegalArgumentException here because we can simply
- // conclude that nothing is bound to the selector.
- throw new IllegalArgumentException( "Address not bound: " + address );
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( request );
- }
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
-
- if( request.exception != null )
- {
- request.exception.fillInStackTrace();
-
- throw request.exception;
- }
- }
-
-
- private class Worker implements Runnable
- {
- public void run()
- {
- Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
-
- for( ; ; )
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
-
- if( nKeys > 0 )
- {
- processSessions( selector.selectedKeys() );
- }
-
- cancelKeys();
-
- if( selector.keys().isEmpty() )
- {
- synchronized( lock )
- {
- if( selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e1 );
- }
- }
- }
- }
-
- private void processSessions( Set keys ) throws IOException
- {
- Iterator it = keys.iterator();
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- it.remove();
-
- if( !key.isAcceptable() )
- {
- continue;
- }
-
- ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
-
- SocketChannel ch = ssc.accept();
-
- if( ch == null )
- {
- continue;
- }
-
- boolean success = false;
- try
- {
-
- RegistrationRequest req = ( RegistrationRequest ) key.attachment();
-
- MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
- MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
- req.config, ch, req.handler, req.address );
-
- // New Interface
-// SocketSessionImpl session = new SocketSessionImpl(
-// SocketAcceptor.this, nextProcessor(), getListeners(),
-// req.config, ch, req.handler, req.address );
-
-
- getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
- req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
- req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
- session.getIoProcessor().addNew( session );
- success = true;
- }
- catch( Throwable t )
- {
- ExceptionMonitor.getInstance().exceptionCaught( t );
- }
- finally
- {
- if( !success )
- {
- ch.close();
- }
- }
- }
- }
- }
-
- private MultiThreadSocketIoProcessor nextProcessor()
- {
- return ioProcessors[processorDistributor++ % processorCount];
- }
-
-
- private void registerNew()
- {
- if( registerQueue.isEmpty() )
- {
- return;
- }
-
- for( ; ; )
- {
- RegistrationRequest req;
-
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
-
- if( req == null )
- {
- break;
- }
-
- ServerSocketChannel ssc = null;
-
- try
- {
- ssc = ServerSocketChannel.open();
- ssc.configureBlocking( false );
-
- // Configure the server socket,
- SocketAcceptorConfig cfg;
- if( req.config instanceof SocketAcceptorConfig )
- {
- cfg = ( SocketAcceptorConfig ) req.config;
- }
- else
- {
- cfg = ( SocketAcceptorConfig ) getDefaultConfig();
- }
-
- ssc.socket().setReuseAddress( cfg.isReuseAddress() );
- ssc.socket().setReceiveBufferSize(
- ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
-
- // and bind.
- ssc.socket().bind( req.address, cfg.getBacklog() );
- if( req.address == null || req.address.getPort() == 0 )
- {
- req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress();
- }
- ssc.register( selector, SelectionKey.OP_ACCEPT, req );
-
- synchronized( channels )
- {
- channels.put( req.address, ssc );
- }
-
- getListeners().fireServiceActivated(
- this, req.address, req.handler, req.config );
- }
- catch( IOException e )
- {
- req.exception = e;
- }
- finally
- {
- synchronized( req )
- {
- req.done = true;
-
- req.notifyAll();
- }
-
- if( ssc != null && req.exception != null )
- {
- try
- {
- ssc.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
- }
- }
-
-
- private void cancelKeys()
- {
- if( cancelQueue.isEmpty() )
- {
- return;
- }
-
- for( ; ; )
- {
- CancellationRequest request;
-
- synchronized( cancelQueue )
- {
- request = ( CancellationRequest ) cancelQueue.pop();
- }
-
- if( request == null )
- {
- break;
- }
-
- ServerSocketChannel ssc;
- synchronized( channels )
- {
- ssc = ( ServerSocketChannel ) channels.remove( request.address );
- }
-
- // close the channel
- try
- {
- if( ssc == null )
- {
- request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
- }
- else
- {
- SelectionKey key = ssc.keyFor( selector );
- request.registrationRequest = ( RegistrationRequest ) key.attachment();
- key.cancel();
-
- selector.wakeup(); // wake up again to trigger thread death
-
- ssc.close();
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- synchronized( request )
- {
- request.done = true;
- request.notifyAll();
- }
-
- if( request.exception == null )
- {
- getListeners().fireServiceDeactivated(
- this, request.address,
- request.registrationRequest.handler,
- request.registrationRequest.config );
- }
- }
- }
- }
-
- private static class RegistrationRequest
- {
- private InetSocketAddress address;
- private final IoHandler handler;
- private final IoServiceConfig config;
- private IOException exception;
- private boolean done;
-
- private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
- {
- this.address = ( InetSocketAddress ) address;
- this.handler = handler;
- this.config = config;
- }
- }
-
-
- private static class CancellationRequest
- {
- private final SocketAddress address;
- private boolean done;
- private RegistrationRequest registrationRequest;
- private RuntimeException exception;
-
- private CancellationRequest( SocketAddress address )
- {
- this.address = address;
- }
- }
-}