summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/mina
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/mina')
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java467
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java227
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java351
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java272
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java197
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java440
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java547
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java486
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java67
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java1026
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java240
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java488
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java151
14 files changed, 5007 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
new file mode 100644
index 0000000000..0c311b6645
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
@@ -0,0 +1,467 @@
+package org.apache.mina.common;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.nio.*;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
+{
+
+
+ private static final int MINIMUM_CAPACITY = 1;
+
+ public FixedSizeByteBufferAllocator ()
+ {
+ }
+
+ public ByteBuffer allocate( int capacity, boolean direct )
+ {
+ java.nio.ByteBuffer nioBuffer;
+ if( direct )
+ {
+ nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity );
+ }
+ else
+ {
+ nioBuffer = java.nio.ByteBuffer.allocate( capacity );
+ }
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer )
+ {
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public void dispose()
+ {
+ }
+
+
+
+ private static final class FixedSizeByteBuffer extends ByteBuffer
+ {
+ private java.nio.ByteBuffer buf;
+ private int mark = -1;
+
+
+ protected FixedSizeByteBuffer( java.nio.ByteBuffer buf )
+ {
+ this.buf = buf;
+ buf.order( ByteOrder.BIG_ENDIAN );
+ }
+
+ public synchronized void acquire()
+ {
+ }
+
+ public void release()
+ {
+ }
+
+ public java.nio.ByteBuffer buf()
+ {
+ return buf;
+ }
+
+ public boolean isPooled()
+ {
+ return false;
+ }
+
+ public void setPooled( boolean pooled )
+ {
+ }
+
+ public ByteBuffer duplicate() {
+ return new FixedSizeByteBuffer( this.buf.duplicate() );
+ }
+
+ public ByteBuffer slice() {
+ return new FixedSizeByteBuffer( this.buf.slice() );
+ }
+
+ public ByteBuffer asReadOnlyBuffer() {
+ return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() );
+ }
+
+ public byte[] array()
+ {
+ return buf.array();
+ }
+
+ public int arrayOffset()
+ {
+ return buf.arrayOffset();
+ }
+
+ public boolean isDirect()
+ {
+ return buf.isDirect();
+ }
+
+ public boolean isReadOnly()
+ {
+ return buf.isReadOnly();
+ }
+
+ public int capacity()
+ {
+ return buf.capacity();
+ }
+
+ public ByteBuffer capacity( int newCapacity )
+ {
+ if( newCapacity > capacity() )
+ {
+ throw new IllegalArgumentException();
+ }
+
+ return this;
+ }
+
+
+
+ public boolean isAutoExpand()
+ {
+ return false;
+ }
+
+ public ByteBuffer setAutoExpand( boolean autoExpand )
+ {
+ if(autoExpand) throw new IllegalArgumentException();
+ else return this;
+ }
+
+ public ByteBuffer expand( int pos, int expectedRemaining )
+ {
+ int end = pos + expectedRemaining;
+ if( end > capacity() )
+ {
+ // The buffer needs expansion.
+ capacity( end );
+ }
+
+ if( end > limit() )
+ {
+ // We call limit() directly to prevent StackOverflowError
+ buf.limit( end );
+ }
+ return this;
+ }
+
+ public int position()
+ {
+ return buf.position();
+ }
+
+ public ByteBuffer position( int newPosition )
+ {
+
+ buf.position( newPosition );
+ if( mark > newPosition )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public int limit()
+ {
+ return buf.limit();
+ }
+
+ public ByteBuffer limit( int newLimit )
+ {
+ buf.limit( newLimit );
+ if( mark > newLimit )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public ByteBuffer mark()
+ {
+ buf.mark();
+ mark = position();
+ return this;
+ }
+
+ public int markValue()
+ {
+ return mark;
+ }
+
+ public ByteBuffer reset()
+ {
+ buf.reset();
+ return this;
+ }
+
+ public ByteBuffer clear()
+ {
+ buf.clear();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer flip()
+ {
+ buf.flip();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer rewind()
+ {
+ buf.rewind();
+ mark = -1;
+ return this;
+ }
+
+ public byte get()
+ {
+ return buf.get();
+ }
+
+ public ByteBuffer put( byte b )
+ {
+ buf.put( b );
+ return this;
+ }
+
+ public byte get( int index )
+ {
+ return buf.get( index );
+ }
+
+ public ByteBuffer put( int index, byte b )
+ {
+ buf.put( index, b );
+ return this;
+ }
+
+ public ByteBuffer get( byte[] dst, int offset, int length )
+ {
+ buf.get( dst, offset, length );
+ return this;
+ }
+
+ public ByteBuffer put( java.nio.ByteBuffer src )
+ {
+ buf.put( src );
+ return this;
+ }
+
+ public ByteBuffer put( byte[] src, int offset, int length )
+ {
+ buf.put( src, offset, length );
+ return this;
+ }
+
+ public ByteBuffer compact()
+ {
+ buf.compact();
+ mark = -1;
+ return this;
+ }
+
+ public ByteOrder order()
+ {
+ return buf.order();
+ }
+
+ public ByteBuffer order( ByteOrder bo )
+ {
+ buf.order( bo );
+ return this;
+ }
+
+ public char getChar()
+ {
+ return buf.getChar();
+ }
+
+ public ByteBuffer putChar( char value )
+ {
+ buf.putChar( value );
+ return this;
+ }
+
+ public char getChar( int index )
+ {
+ return buf.getChar( index );
+ }
+
+ public ByteBuffer putChar( int index, char value )
+ {
+ buf.putChar( index, value );
+ return this;
+ }
+
+ public CharBuffer asCharBuffer()
+ {
+ return buf.asCharBuffer();
+ }
+
+ public short getShort()
+ {
+ return buf.getShort();
+ }
+
+ public ByteBuffer putShort( short value )
+ {
+ buf.putShort( value );
+ return this;
+ }
+
+ public short getShort( int index )
+ {
+ return buf.getShort( index );
+ }
+
+ public ByteBuffer putShort( int index, short value )
+ {
+ buf.putShort( index, value );
+ return this;
+ }
+
+ public ShortBuffer asShortBuffer()
+ {
+ return buf.asShortBuffer();
+ }
+
+ public int getInt()
+ {
+ return buf.getInt();
+ }
+
+ public ByteBuffer putInt( int value )
+ {
+ buf.putInt( value );
+ return this;
+ }
+
+ public int getInt( int index )
+ {
+ return buf.getInt( index );
+ }
+
+ public ByteBuffer putInt( int index, int value )
+ {
+ buf.putInt( index, value );
+ return this;
+ }
+
+ public IntBuffer asIntBuffer()
+ {
+ return buf.asIntBuffer();
+ }
+
+ public long getLong()
+ {
+ return buf.getLong();
+ }
+
+ public ByteBuffer putLong( long value )
+ {
+ buf.putLong( value );
+ return this;
+ }
+
+ public long getLong( int index )
+ {
+ return buf.getLong( index );
+ }
+
+ public ByteBuffer putLong( int index, long value )
+ {
+ buf.putLong( index, value );
+ return this;
+ }
+
+ public LongBuffer asLongBuffer()
+ {
+ return buf.asLongBuffer();
+ }
+
+ public float getFloat()
+ {
+ return buf.getFloat();
+ }
+
+ public ByteBuffer putFloat( float value )
+ {
+ buf.putFloat( value );
+ return this;
+ }
+
+ public float getFloat( int index )
+ {
+ return buf.getFloat( index );
+ }
+
+ public ByteBuffer putFloat( int index, float value )
+ {
+ buf.putFloat( index, value );
+ return this;
+ }
+
+ public FloatBuffer asFloatBuffer()
+ {
+ return buf.asFloatBuffer();
+ }
+
+ public double getDouble()
+ {
+ return buf.getDouble();
+ }
+
+ public ByteBuffer putDouble( double value )
+ {
+ buf.putDouble( value );
+ return this;
+ }
+
+ public double getDouble( int index )
+ {
+ return buf.getDouble( index );
+ }
+
+ public ByteBuffer putDouble( int index, double value )
+ {
+ buf.putDouble( index, value );
+ return this;
+ }
+
+ public DoubleBuffer asDoubleBuffer()
+ {
+ return buf.asDoubleBuffer();
+ }
+
+
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
new file mode 100644
index 0000000000..4fd28c4eb5
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common.support;
+
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFutureListener;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * A default implementation of {@link org.apache.mina.common.IoFuture}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ */
+public class DefaultIoFuture implements IoFuture
+{
+ private final IoSession session;
+ private final Object lock;
+ private List listeners;
+ private Object result;
+ private boolean ready;
+
+
+ /**
+ * Creates a new instance.
+ *
+ * @param session an {@link IoSession} which is associated with this future
+ */
+ public DefaultIoFuture( IoSession session )
+ {
+ this.session = session;
+ this.lock = this;
+ }
+
+ /**
+ * Creates a new instance which uses the specified object as a lock.
+ */
+ public DefaultIoFuture( IoSession session, Object lock )
+ {
+ if( lock == null )
+ {
+ throw new NullPointerException( "lock" );
+ }
+ this.session = session;
+ this.lock = lock;
+ }
+
+ public IoSession getSession()
+ {
+ return session;
+ }
+
+ public Object getLock()
+ {
+ return lock;
+ }
+
+ public void join()
+ {
+ synchronized( lock )
+ {
+ while( !ready )
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+ }
+
+ public boolean join( long timeoutInMillis )
+ {
+ long startTime = ( timeoutInMillis <= 0 ) ? 0 : System
+ .currentTimeMillis();
+ long waitTime = timeoutInMillis;
+
+ synchronized( lock )
+ {
+ if( ready )
+ {
+ return ready;
+ }
+ else if( waitTime <= 0 )
+ {
+ return ready;
+ }
+
+ for( ;; )
+ {
+ try
+ {
+ lock.wait( waitTime );
+ }
+ catch( InterruptedException e )
+ {
+ }
+
+ if( ready )
+ return true;
+ else
+ {
+ waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime );
+ if( waitTime <= 0 )
+ {
+ return ready;
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isReady()
+ {
+ synchronized( lock )
+ {
+ return ready;
+ }
+ }
+
+ /**
+ * Sets the result of the asynchronous operation, and mark it as finished.
+ */
+ protected void setValue( Object newValue )
+ {
+ synchronized( lock )
+ {
+ // Allow only once.
+ if( ready )
+ {
+ return;
+ }
+
+ result = newValue;
+ ready = true;
+ lock.notifyAll();
+
+ notifyListeners();
+ }
+ }
+
+ /**
+ * Returns the result of the asynchronous operation.
+ */
+ protected Object getValue()
+ {
+ synchronized( lock )
+ {
+ return result;
+ }
+ }
+
+ public void addListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ if(listeners == null)
+ {
+ listeners = new ArrayList();
+ }
+ listeners.add( listener );
+ if( ready )
+ {
+ listener.operationComplete( this );
+ }
+ }
+ }
+
+ public void removeListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ listeners.remove( listener );
+ }
+ }
+
+ private void notifyListeners()
+ {
+ synchronized( lock )
+ {
+
+ if(listeners != null)
+ {
+
+ for( Iterator i = listeners.iterator(); i.hasNext(); ) {
+ ( ( IoFutureListener ) i.next() ).operationComplete( this );
+ }
+ }
+ }
+ }
+}
+
+
+
diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java b/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
new file mode 100644
index 0000000000..5723ffbaa9
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common.support;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.IdentityHashSet;
+
+/**
+ * A helper which provides addition and removal of {@link IoServiceListener}s and firing
+ * events.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 446526 $, $Date: 2006-09-15 01:44:11 -0400 (Fri, 15 Sep 2006) $
+ */
+public class IoServiceListenerSupport
+{
+ /**
+ * A list of {@link IoServiceListener}s.
+ */
+ private final List listeners = new ArrayList();
+
+ /**
+ * Tracks managed <tt>serviceAddress</tt>es.
+ */
+ private final Set managedServiceAddresses = new HashSet();
+
+ /**
+ * Tracks managed sesssions with <tt>serviceAddress</tt> as a key.
+ */
+ private final Map managedSessions = new HashMap();
+
+ /**
+ * Creates a new instance.
+ */
+ public IoServiceListenerSupport()
+ {
+ }
+
+ /**
+ * Adds a new listener.
+ */
+ public void add( IoServiceListener listener )
+ {
+ synchronized( listeners )
+ {
+ listeners.add( listener );
+ }
+ }
+
+ /**
+ * Removes an existing listener.
+ */
+ public void remove( IoServiceListener listener )
+ {
+ synchronized( listeners )
+ {
+ listeners.remove( listener );
+ }
+ }
+
+ public Set getManagedServiceAddresses()
+ {
+ return Collections.unmodifiableSet( managedServiceAddresses );
+ }
+
+ public boolean isManaged( SocketAddress serviceAddress )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ return managedServiceAddresses.contains( serviceAddress );
+ }
+ }
+
+ public Set getManagedSessions( SocketAddress serviceAddress )
+ {
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ if( sessions == null )
+ {
+ sessions = new IdentityHashSet();
+ }
+ }
+
+ synchronized( sessions )
+ {
+ return new IdentityHashSet( sessions );
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#serviceActivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
+ * for all registered listeners.
+ */
+ public void fireServiceActivated(
+ IoService service, SocketAddress serviceAddress,
+ IoHandler handler, IoServiceConfig config )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ if( !managedServiceAddresses.add( serviceAddress ) )
+ {
+ return;
+ }
+ }
+
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).serviceActivated(
+ service, serviceAddress, handler, config );
+ }
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#serviceDeactivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
+ * for all registered listeners.
+ */
+ public synchronized void fireServiceDeactivated(
+ IoService service, SocketAddress serviceAddress,
+ IoHandler handler, IoServiceConfig config )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ if( !managedServiceAddresses.remove( serviceAddress ) )
+ {
+ return;
+ }
+ }
+
+ try
+ {
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).serviceDeactivated(
+ service, serviceAddress, handler, config );
+ }
+ }
+ }
+ finally
+ {
+ disconnectSessions( serviceAddress, config );
+ }
+ }
+
+
+ /**
+ * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
+ */
+ public void fireSessionCreated( IoSession session )
+ {
+ SocketAddress serviceAddress = session.getServiceAddress();
+
+ // Get the session set.
+ boolean firstSession = false;
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ if( sessions == null )
+ {
+ sessions = new IdentityHashSet();
+ managedSessions.put( serviceAddress, sessions );
+ firstSession = true;
+ }
+ }
+
+ // If already registered, ignore.
+ synchronized( sessions )
+ {
+ if ( !sessions.add( session ) )
+ {
+ return;
+ }
+ }
+
+ // If the first connector session, fire a virtual service activation event.
+ if( session.getService() instanceof IoConnector && firstSession )
+ {
+ fireServiceActivated(
+ session.getService(), session.getServiceAddress(),
+ session.getHandler(), session.getServiceConfig() );
+ }
+
+ // Fire session events.
+ session.getFilterChain().fireSessionCreated( session );
+ session.getFilterChain().fireSessionOpened( session);
+
+ // Fire listener events.
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).sessionCreated( session );
+ }
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
+ */
+ public void fireSessionDestroyed( IoSession session )
+ {
+ SocketAddress serviceAddress = session.getServiceAddress();
+
+ // Get the session set.
+ Set sessions;
+ boolean lastSession = false;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ // Ignore if unknown.
+ if( sessions == null )
+ {
+ return;
+ }
+
+ // Try to remove the remaining empty seession set after removal.
+ synchronized( sessions )
+ {
+ sessions.remove( session );
+ if( sessions.isEmpty() )
+ {
+ managedSessions.remove( serviceAddress );
+ lastSession = true;
+ }
+ }
+ }
+
+ // Fire session events.
+ session.getFilterChain().fireSessionClosed( session );
+
+ // Fire listener events.
+ try
+ {
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).sessionDestroyed( session );
+ }
+ }
+ }
+ finally
+ {
+ // Fire a virtual service deactivation event for the last session of the connector.
+ //TODO double-check that this is *STILL* the last session. May not be the case
+ if( session.getService() instanceof IoConnector && lastSession )
+ {
+ fireServiceDeactivated(
+ session.getService(), session.getServiceAddress(),
+ session.getHandler(), session.getServiceConfig() );
+ }
+ }
+ }
+
+ private void disconnectSessions( SocketAddress serviceAddress, IoServiceConfig config )
+ {
+ if( !( config instanceof IoAcceptorConfig ) )
+ {
+ return;
+ }
+
+ if( !( ( IoAcceptorConfig ) config ).isDisconnectOnUnbind() )
+ {
+ return;
+ }
+
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ }
+
+ if( sessions == null )
+ {
+ return;
+ }
+
+ Set sessionsCopy;
+
+ // Create a copy to avoid ConcurrentModificationException
+ synchronized( sessions )
+ {
+ sessionsCopy = new IdentityHashSet( sessions );
+ }
+
+ final CountDownLatch latch = new CountDownLatch(sessionsCopy.size());
+
+ for( Iterator i = sessionsCopy.iterator(); i.hasNext(); )
+ {
+ ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
+ {
+ public void operationComplete( IoFuture future )
+ {
+ latch.countDown();
+ }
+ } );
+ }
+
+ try
+ {
+ latch.await();
+ }
+ catch( InterruptedException ie )
+ {
+ // Ignored
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
new file mode 100644
index 0000000000..47f19aa76d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
@@ -0,0 +1,48 @@
+package org.apache.mina.filter;
+
+import org.apache.mina.common.IoFilter;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class WriteBufferFullExeception extends RuntimeException
+{
+ private IoFilter.WriteRequest _writeRequest;
+
+ public WriteBufferFullExeception()
+ {
+ this(null);
+ }
+
+ public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
+
+
+ public void setWriteRequest(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
+
+ public IoFilter.WriteRequest getWriteRequest()
+ {
+ return _writeRequest;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
new file mode 100644
index 0000000000..4e9db9071a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than
+ * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the
+ * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
+ * cause an Out of Memory exception due to a back log of unprocessed messages.
+ *
+ * This is should only be viewed as a temporary work around for DIRMINA-302.
+ *
+ * A true solution should not be implemented as a filter as this issue will always occur. On a machine
+ * where the network is slower than the local producer.
+ *
+ * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
+ *
+ * They could be:
+ * Block - As this does
+ * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
+ * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state
+ *
+ * <p/>
+ * <p>Usage:
+ * <p/>
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( builder );
+ * </code></pre>
+ * <p/>
+ * or
+ * <p/>
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class WriteBufferLimitFilterBuilder
+{
+ public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize";
+
+ private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000;
+
+ private volatile boolean throwNotBlock = false;
+
+ private volatile int maximumConnectionBufferCount;
+ private volatile long maximumConnectionBufferSize;
+
+ private final Object _blockLock = new Object();
+
+ private int _blockWaiters = 0;
+
+
+ public WriteBufferLimitFilterBuilder()
+ {
+ this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT);
+ }
+
+ public WriteBufferLimitFilterBuilder(int maxWriteBufferSize)
+ {
+ setMaximumConnectionBufferCount(maxWriteBufferSize);
+ }
+
+
+ /**
+ * Set the maximum amount pending items in the writeQueue for a given session.
+ * Changing the value will only take effect when new data is received for a
+ * connection, including existing connections. Default value is 5000 msgs.
+ *
+ * @param maximumConnectionBufferCount New buffer size. Must be > 0
+ */
+ public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount)
+ {
+ this.maximumConnectionBufferCount = maximumConnectionBufferCount;
+ this.maximumConnectionBufferSize = 0;
+ }
+
+ public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize)
+ {
+ this.maximumConnectionBufferSize = maximumConnectionBufferSize;
+ this.maximumConnectionBufferCount = 0;
+ }
+
+ /**
+ * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself
+ * before and after that filter.
+ *
+ * @param chain {@link IoFilterChain} to attach self to.
+ */
+ public void attach(IoFilterChain chain)
+ {
+ String name = getThreadPoolFilterEntryName(chain.getAll());
+
+ chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ /**
+ * Attach this filter to the specified builder. It will search for the
+ * {@link ExecutorFilter}, and attach itself before and after that filter.
+ *
+ * @param builder {@link DefaultIoFilterChainBuilder} to attach self to.
+ */
+ public void attach(DefaultIoFilterChainBuilder builder)
+ {
+ String name = getThreadPoolFilterEntryName(builder.getAll());
+
+ builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ private String getThreadPoolFilterEntryName(List entries)
+ {
+ Iterator i = entries.iterator();
+
+ while (i.hasNext())
+ {
+ IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next();
+
+ if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class))
+ {
+ return entry.getName();
+ }
+ }
+
+ throw new IllegalStateException("Chain does not contain a ExecutorFilter");
+ }
+
+
+ public class SendLimit extends IoFilterAdapter
+ {
+ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ {
+ try
+ {
+ waitTillSendAllowed(session);
+ }
+ catch (WriteBufferFullExeception wbfe)
+ {
+ nextFilter.exceptionCaught(session, wbfe);
+ }
+
+ if (writeRequest.getMessage() instanceof ByteBuffer)
+ {
+ increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage());
+ }
+
+ nextFilter.filterWrite(session, writeRequest);
+ }
+
+ private void increasePendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ Long pendingSize = getScheduledWriteBytes(session) + message.remaining();
+ session.setAttribute(PENDING_SIZE, pendingSize);
+ }
+ }
+
+ private boolean sendAllowed(IoSession session)
+ {
+ if (session.isClosing())
+ {
+ return true;
+ }
+
+ int lmswm = maximumConnectionBufferCount;
+ long lmswb = maximumConnectionBufferSize;
+
+ return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm)
+ && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb);
+ }
+
+ private long getScheduledWriteBytes(IoSession session)
+ {
+ synchronized (session)
+ {
+ Long i = (Long) session.getAttribute(PENDING_SIZE);
+ return null == i ? 0 : i;
+ }
+ }
+
+ private void waitTillSendAllowed(IoSession session)
+ {
+ synchronized (_blockLock)
+ {
+ if (throwNotBlock)
+ {
+ throw new WriteBufferFullExeception();
+ }
+
+ _blockWaiters++;
+
+ while (!sendAllowed(session))
+ {
+ try
+ {
+ _blockLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore.
+ }
+ }
+ _blockWaiters--;
+ }
+ }
+
+ public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
+ {
+ if (message instanceof ByteBuffer)
+ {
+ decrementPendingWriteSize(session, (ByteBuffer) message);
+ }
+ notifyWaitingWriters();
+ nextFilter.messageSent(session, message);
+ }
+
+ private void decrementPendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining());
+ }
+ }
+
+ private void notifyWaitingWriters()
+ {
+ synchronized (_blockLock)
+ {
+ if (_blockWaiters != 0)
+ {
+ _blockLock.notifyAll();
+ }
+ }
+
+ }
+
+ }//SentLimit
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
new file mode 100644
index 0000000000..3f7e206cb4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes CRLF terminated lines into
+ * <code>Command</code> objects:
+ * <pre>
+ * public class CRLFTerminatedCommandLineDecoder
+ * extends CumulativeProtocolDecoder {
+ *
+ * private Command parseCommand(ByteBuffer in) {
+ * // Convert the bytes in the specified buffer to a
+ * // Command object.
+ * ...
+ * }
+ *
+ * protected boolean doDecode(IoSession session, ByteBuffer in,
+ * ProtocolDecoderOutput out)
+ * throws Exception {
+ *
+ * // Remember the initial position.
+ * int start = in.position();
+ *
+ * // Now find the first CRLF in the buffer.
+ * byte previous = 0;
+ * while (in.hasRemaining()) {
+ * byte current = in.get();
+ *
+ * if (previous == '\r' && current == '\n') {
+ * // Remember the current position and limit.
+ * int position = in.position();
+ * int limit = in.limit();
+ * try {
+ * in.position(start);
+ * in.limit(position);
+ * // The bytes between in.position() and in.limit()
+ * // now contain a full CRLF terminated line.
+ * out.write(parseCommand(in.slice()));
+ * } finally {
+ * // Set the position to point right after the
+ * // detected line and set the limit to the old
+ * // one.
+ * in.position(position);
+ * in.limit(limit);
+ * }
+ * // Decoded one line; CumulativeProtocolDecoder will
+ * // call me again until I return false. So just
+ * // return true until there are no more lines in the
+ * // buffer.
+ * return true;
+ * }
+ *
+ * previous = current;
+ * }
+ *
+ * // Could not find CRLF in the buffer. Reset the initial
+ * // position to the one we recorded above.
+ * in.position(start);
+ *
+ * return false;
+ * }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+ private static final String BUFFER = OurCumulativeProtocolDecoder.class
+ .getName()
+ + ".Buffer";
+
+ /**
+ * Creates a new instance.
+ */
+ protected OurCumulativeProtocolDecoder() {
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is NOT compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ boolean usingSessionBuffer = true;
+ ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
+ // If we have a session buffer, append data to that; otherwise
+ // use the buffer read from the network directly.
+ if (buf != null) {
+ buf.put(in);
+ buf.flip();
+ } else {
+ buf = in;
+ usingSessionBuffer = false;
+ }
+
+ for (;;) {
+ int oldPos = buf.position();
+ boolean decoded = doDecode(session, buf, out);
+ if (decoded) {
+ if (buf.position() == oldPos) {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if (buf.hasRemaining()) {
+ storeRemainingInSession(buf, session);
+ } else {
+ if (usingSessionBuffer)
+ removeSessionBuffer(session);
+ }
+ }
+
+ /**
+ * Implement this method to consume the specified cumulative buffer and
+ * decode its content into message(s).
+ *
+ * @param in the cumulative buffer
+ * @return <tt>true</tt> if and only if there's more to decode in the buffer
+ * and you want to have <tt>doDecode</tt> method invoked again.
+ * Return <tt>false</tt> if remaining data is not enough to decode,
+ * then this method will be invoked again when more data is cumulated.
+ * @throws Exception if cannot decode <tt>in</tt>.
+ */
+ protected abstract boolean doDecode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception;
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose(IoSession session) throws Exception {
+ removeSessionBuffer(session);
+ }
+
+ private void removeSessionBuffer(IoSession session) {
+ ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
+ if (buf != null) {
+ buf.release();
+ }
+ }
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
+ ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
+ remainingBuf.setAutoExpand(true);
+ remainingBuf.order(buf.order());
+ remainingBuf.put(buf);
+ session.setAttribute(BUFFER, remainingBuf);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
new file mode 100644
index 0000000000..b8c6f29720
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
@@ -0,0 +1,440 @@
+package org.apache.mina.filter.codec;
+
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultWriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.util.SessionLog;
+import org.apache.mina.util.Queue;
+
+
+public class QpidProtocolCodecFilter extends IoFilterAdapter
+{
+ public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
+ public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
+
+ private static final Class[] EMPTY_PARAMS = new Class[0];
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
+
+ private final ProtocolCodecFactory factory;
+
+ public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
+ {
+ if( factory == null )
+ {
+ throw new NullPointerException( "factory" );
+ }
+ this.factory = factory;
+ }
+
+ public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+ {
+ if( encoder == null )
+ {
+ throw new NullPointerException( "encoder" );
+ }
+ if( decoder == null )
+ {
+ throw new NullPointerException( "decoder" );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder()
+ {
+ return encoder;
+ }
+
+ public ProtocolDecoder getDecoder()
+ {
+ return decoder;
+ }
+ };
+ }
+
+ public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+ {
+ if( encoderClass == null )
+ {
+ throw new NullPointerException( "encoderClass" );
+ }
+ if( decoderClass == null )
+ {
+ throw new NullPointerException( "decoderClass" );
+ }
+ if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+ {
+ throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+ }
+ if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+ {
+ throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+ }
+ try
+ {
+ encoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+ }
+ try
+ {
+ decoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder() throws Exception
+ {
+ return ( ProtocolEncoder ) encoderClass.newInstance();
+ }
+
+ public ProtocolDecoder getDecoder() throws Exception
+ {
+ return ( ProtocolDecoder ) decoderClass.newInstance();
+ }
+ };
+ }
+
+ public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+ {
+ if( parent.contains( ProtocolCodecFilter.class ) )
+ {
+ throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+ }
+ }
+
+ public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( !( message instanceof ByteBuffer ) )
+ {
+ nextFilter.messageReceived( session, message );
+ return;
+ }
+
+ ByteBuffer in = ( ByteBuffer ) message;
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+
+ try
+ {
+ decoder.decode( session, in, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ pde.setHexdump( in.getHexDump() );
+ throw pde;
+ }
+ finally
+ {
+ // Dispose the decoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeDecoder( session );
+ }
+
+ // Release the read buffer.
+ in.release();
+
+ decoderOut.flush();
+ }
+ }
+
+ public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( message instanceof HiddenByteBuffer )
+ {
+ return;
+ }
+
+ if( !( message instanceof MessageByteBuffer ) )
+ {
+ nextFilter.messageSent( session, message );
+ return;
+ }
+
+ nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
+ }
+
+ public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
+ {
+ Object message = writeRequest.getMessage();
+ if( message instanceof ByteBuffer )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ return;
+ }
+
+ ProtocolEncoder encoder = getEncoder( session );
+ ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
+
+ try
+ {
+ encoder.encode( session, message, encoderOut );
+ encoderOut.flush();
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ new MessageByteBuffer( writeRequest.getMessage() ),
+ writeRequest.getFuture(), writeRequest.getDestination() ) );
+ }
+ catch( Throwable t )
+ {
+ ProtocolEncoderException pee;
+ if( t instanceof ProtocolEncoderException )
+ {
+ pee = ( ProtocolEncoderException ) t;
+ }
+ else
+ {
+ pee = new ProtocolEncoderException( t );
+ }
+ throw pee;
+ }
+ finally
+ {
+ // Dispose the encoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeEncoder( session );
+ }
+ }
+ }
+
+ public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ // Call finishDecode() first when a connection is closed.
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+ try
+ {
+ decoder.finishDecode( session, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ throw pde;
+ }
+ finally
+ {
+ // Dispose all.
+ disposeEncoder( session );
+ disposeDecoder( session );
+
+ decoderOut.flush();
+ }
+
+ nextFilter.sessionClosed( session );
+ }
+
+ private ProtocolEncoder getEncoder( IoSession session ) throws Exception
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+ if( encoder == null )
+ {
+ encoder = factory.getEncoder();
+ session.setAttribute( ENCODER, encoder );
+ }
+ return encoder;
+ }
+
+ private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
+ }
+
+ private ProtocolDecoder getDecoder( IoSession session ) throws Exception
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+ if( decoder == null )
+ {
+ decoder = factory.getDecoder();
+ session.setAttribute( DECODER, decoder );
+ }
+ return decoder;
+ }
+
+ private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
+ {
+ return new SimpleProtocolDecoderOutput( session, nextFilter );
+ }
+
+ private void disposeEncoder( IoSession session )
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+ if( encoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ encoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Failed to dispose: " + encoder.getClass().getName() +
+ " (" + encoder + ')' );
+ }
+ }
+
+ private void disposeDecoder( IoSession session )
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+ if( decoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ decoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Falied to dispose: " + decoder.getClass().getName() +
+ " (" + decoder + ')' );
+ }
+ }
+
+ private static class HiddenByteBuffer extends ByteBufferProxy
+ {
+ private HiddenByteBuffer( ByteBuffer buf )
+ {
+ super( buf );
+ }
+ }
+
+ private static class MessageByteBuffer extends ByteBufferProxy
+ {
+ private final Object message;
+
+ private MessageByteBuffer( Object message )
+ {
+ super( EMPTY_BUFFER );
+ this.message = message;
+ }
+
+ public void acquire()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+
+ public void release()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+ }
+
+ private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
+ {
+ private ByteBuffer buffer;
+
+ private final IoSession session;
+ private final IoFilter.NextFilter nextFilter;
+ private final IoFilter.WriteRequest writeRequest;
+
+ public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ this.session = session;
+ this.nextFilter = nextFilter;
+ this.writeRequest = writeRequest;
+ }
+
+
+
+ public void write( ByteBuffer buf )
+ {
+ if(buffer != null)
+ {
+ flush();
+ }
+ buffer = buf;
+ }
+
+ public void mergeAll()
+ {
+ }
+
+ public WriteFuture flush()
+ {
+ WriteFuture future = null;
+ if( buffer == null )
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer buf = buffer;
+ // Flush only when the buffer has remaining.
+ if( buf.hasRemaining() )
+ {
+ future = doFlush( buf );
+ }
+
+ }
+
+ return future;
+ }
+
+
+ protected WriteFuture doFlush( ByteBuffer buf )
+ {
+ WriteFuture future = new DefaultWriteFuture( session );
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ buf,
+ future, writeRequest.getDestination() ) );
+ return future;
+ }
+ }
+}
+
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
new file mode 100644
index 0000000000..e5360d32e0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.NamePreservingRunnable;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class MultiThreadSocketAcceptor extends SocketAcceptor
+{
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static volatile int nextId = 0;
+
+ private final Executor executor;
+ private final Object lock = new Object();
+ private final int id = nextId ++;
+ private final String threadName = "SocketAcceptor-" + id;
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+
+ /**
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+ */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+
+ /**
+ * Create an acceptor with a single processing thread using a NewThreadExecutor
+ */
+ public MultiThreadSocketAcceptor()
+ {
+ this( 1, new NewThreadExecutor() );
+ }
+
+ /**
+ * Create an acceptor with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketAcceptor( int processorCount, Executor executor )
+ {
+ if( processorCount < 1 )
+ {
+ throw new IllegalArgumentException( "Must have at least one processor" );
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for( int i = 0; i < processorCount; i++ )
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor );
+ }
+ }
+
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming connections with the specified
+ * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
+ {
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( address != null && !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, handler, config );
+
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker() throws IOException
+ {
+ synchronized( lock )
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+
+ executor.execute( new NamePreservingRunnable( worker ) );
+ }
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ CancellationRequest request = new CancellationRequest( address );
+
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+ }
+
+
+ private class Worker implements Runnable
+ {
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
+
+ for( ; ; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( lock )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ try
+ {
+
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+
+ MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
+ MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
+ req.config, ch, req.handler, req.address );
+
+ // New Interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// SocketAcceptor.this, nextProcessor(), getListeners(),
+// req.config, ch, req.handler, req.address );
+
+
+ getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
+ session.getIoProcessor().addNew( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+
+ // Configure the server socket,
+ SocketAcceptorConfig cfg;
+ if( req.config instanceof SocketAcceptorConfig )
+ {
+ cfg = ( SocketAcceptorConfig ) req.config;
+ }
+ else
+ {
+ cfg = ( SocketAcceptorConfig ) getDefaultConfig();
+ }
+
+ ssc.socket().setReuseAddress( cfg.isReuseAddress() );
+ ssc.socket().setReceiveBufferSize(
+ ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
+
+ // and bind.
+ ssc.socket().bind( req.address, cfg.getBacklog() );
+ if( req.address == null || req.address.getPort() == 0 )
+ {
+ req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress();
+ }
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ synchronized( channels )
+ {
+ channels.put( req.address, ssc );
+ }
+
+ getListeners().fireServiceActivated(
+ this, req.address, req.handler, req.config );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notifyAll();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc;
+ synchronized( channels )
+ {
+ ssc = ( ServerSocketChannel ) channels.remove( request.address );
+ }
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+ request.registrationRequest = ( RegistrationRequest ) key.attachment();
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+ request.notifyAll();
+ }
+
+ if( request.exception == null )
+ {
+ getListeners().fireServiceDeactivated(
+ this, request.address,
+ request.registrationRequest.handler,
+ request.registrationRequest.config );
+ }
+ }
+ }
+ }
+
+ private static class RegistrationRequest
+ {
+ private InetSocketAddress address;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+ private IOException exception;
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ this.address = ( InetSocketAddress ) address;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+ private boolean done;
+ private RegistrationRequest registrationRequest;
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
new file mode 100644
index 0000000000..7344f70078
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class MultiThreadSocketConnector extends SocketConnector
+{
+ /** @noinspection StaticNonFinalField */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+
+ private final Queue connectQueue = new Queue();
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+ private final Executor executor;
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+ private int workerTimeout = 60; // 1 min.
+
+ /** Create a connector with a single processing thread using a NewThreadExecutor */
+ public MultiThreadSocketConnector()
+ {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketConnector(int processorCount, Executor executor)
+ {
+ if (processorCount < 1)
+ {
+ throw new IllegalArgumentException("Must have at least one processor");
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++)
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+ }
+ }
+
+ /**
+ * How many seconds to keep the connection thread alive between connection requests
+ *
+ * @return Number of seconds to keep connection thread alive
+ */
+ public int getWorkerTimeout()
+ {
+ return workerTimeout;
+ }
+
+ /**
+ * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+ *
+ * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+ */
+ public void setWorkerTimeout(int workerTimeout)
+ {
+ if (workerTimeout < 0)
+ {
+ throw new IllegalArgumentException("Must be >= 0");
+ }
+ this.workerTimeout = workerTimeout;
+ }
+
+ public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+ {
+ return connect(address, null, handler, config);
+ }
+
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config)
+ {
+ if (address == null)
+ {
+ throw new NullPointerException("address");
+ }
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+
+ if (!(address instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected address type: "
+ + address.getClass());
+ }
+
+ if (localAddress != null && !(localAddress instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected local address type: "
+ + localAddress.getClass());
+ }
+
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+
+ SocketChannel ch = null;
+ boolean success = false;
+ try
+ {
+ ch = SocketChannel.open();
+ ch.socket().setReuseAddress(true);
+ if (localAddress != null)
+ {
+ ch.socket().bind(localAddress);
+ }
+
+ ch.configureBlocking(false);
+
+ if (ch.connect(address))
+ {
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ newSession(ch, handler, config, future);
+ success = true;
+ return future;
+ }
+
+ success = true;
+ }
+ catch (IOException e)
+ {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ finally
+ {
+ if (!success && ch != null)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest(ch, handler, config);
+ synchronized (lock)
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e2)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e2);
+ }
+
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ }
+
+ synchronized (connectQueue)
+ {
+ connectQueue.push(request);
+ }
+ selector.wakeup();
+
+ return request;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if (worker == null)
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ private void registerNew()
+ {
+ if (connectQueue.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ ConnectionRequest req;
+ synchronized (connectQueue)
+ {
+ req = (ConnectionRequest) connectQueue.pop();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register(selector, SelectionKey.OP_CONNECT, req);
+ }
+ catch (IOException e)
+ {
+ req.setException(e);
+ }
+ }
+ }
+
+ private void processSessions(Set keys)
+ {
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isConnectable())
+ {
+ continue;
+ }
+
+ SocketChannel ch = (SocketChannel) key.channel();
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ newSession(ch, entry.handler, entry.config, entry);
+ success = true;
+ }
+ catch (Throwable e)
+ {
+ entry.setException(e);
+ }
+ finally
+ {
+ key.cancel();
+ if (!success)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions(Set keys)
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ if (currentTime >= entry.deadline)
+ {
+ entry.setException(new ConnectException());
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ key.cancel();
+ }
+ }
+ }
+ }
+
+ private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ MultiThreadSocketSessionImpl session =
+ new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(),
+ config, ch, handler, ch.socket().getRemoteSocketAddress());
+
+ //new interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// this, nextProcessor(), getListeners(),
+// config, ch, handler, ch.socket().getRemoteSocketAddress() );
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+
+ // Set the ConnectFuture of the specified session, which will be
+ // removed and notified by AbstractIoFilterChain eventually.
+ session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
+
+ // Forward the remaining process to the SocketIoProcessor.
+ session.getIoProcessor().addNew(session);
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ private class Worker implements Runnable
+ {
+ private long lastActive = System.currentTimeMillis();
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName);
+
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(1000);
+
+ registerNew();
+
+ if (nKeys > 0)
+ {
+ processSessions(selector.selectedKeys());
+ }
+
+ processTimedOutSessions(selector.keys());
+
+ if (selector.keys().isEmpty())
+ {
+ if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+ {
+ synchronized (lock)
+ {
+ if (selector.keys().isEmpty() &&
+ connectQueue.isEmpty())
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ lastActive = System.currentTimeMillis();
+ }
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ }
+ }
+
+ private class ConnectionRequest extends DefaultConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+
+ private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+ {
+ this.channel = channel;
+ long timeout;
+ if (config instanceof IoConnectorConfig)
+ {
+ timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+ }
+ else
+ {
+ timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+ }
+ this.deadline = System.currentTimeMillis() + timeout;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
new file mode 100644
index 0000000000..67b8c8d820
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ */
+class MultiThreadSocketFilterChain extends AbstractIoFilterChain {
+
+ MultiThreadSocketFilterChain( IoSession parent )
+ {
+ super( parent );
+ }
+
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ Queue writeRequestQueue = s.getWriteRequestQueue();
+
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.push( writeRequest );
+ if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+ {
+ // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ s.getIoProcessor().flush( s );
+ }
+ }
+ }
+
+ protected void doClose( IoSession session ) throws IOException
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ s.getIoProcessor().remove( s );
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
new file mode 100644
index 0000000000..c23ad8686f
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.Queue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $,
+ */
+class MultiThreadSocketIoProcessor extends SocketIoProcessor
+{
+ Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
+ Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader");
+ Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer");
+
+ private static final long SELECTOR_TIMEOUT = 1000L;
+
+ private int MAX_READ_BYTES_PER_SESSION = 524288; //512K
+ private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K
+
+ private final Object readLock = new Object();
+ private final Object writeLock = new Object();
+
+ private final String threadName;
+ private final Executor executor;
+
+ private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private volatile Selector selector, writeSelector;
+
+ private final Queue newSessions = new Queue();
+ private final Queue removingSessions = new Queue();
+ private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
+ private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
+
+ private final Queue trafficControllingSessions = new Queue();
+
+ private ReadWorker readWorker;
+ private WriteWorker writeWorker;
+ private long lastIdleReadCheckTime = System.currentTimeMillis();
+ private long lastIdleWriteCheckTime = System.currentTimeMillis();
+
+ MultiThreadSocketIoProcessor(String threadName, Executor executor)
+ {
+ super(threadName, executor);
+ this.threadName = threadName;
+ this.executor = executor;
+ }
+
+ void addNew(SocketSessionImpl session) throws IOException
+ {
+ synchronized (newSessions)
+ {
+ newSessions.push(session);
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+ writeSelector.wakeup();
+ }
+
+ void remove(SocketSessionImpl session) throws IOException
+ {
+ scheduleRemove(session);
+ startupWorker();
+ selector.wakeup();
+ }
+
+ private void startupWorker() throws IOException
+ {
+ synchronized (readLock)
+ {
+ if (readWorker == null)
+ {
+ selector = Selector.open();
+ readWorker = new ReadWorker();
+ executor.execute(new NamePreservingRunnable(readWorker));
+ }
+ }
+
+ synchronized (writeLock)
+ {
+ if (writeWorker == null)
+ {
+ writeSelector = Selector.open();
+ writeWorker = new WriteWorker();
+ executor.execute(new NamePreservingRunnable(writeWorker));
+ }
+ }
+
+ }
+
+ void flush(SocketSessionImpl session)
+ {
+ scheduleFlush(session);
+ Selector selector = this.writeSelector;
+
+ if (selector != null)
+ {
+ selector.wakeup();
+ }
+ }
+
+ void updateTrafficMask(SocketSessionImpl session)
+ {
+ scheduleTrafficControl(session);
+ Selector selector = this.selector;
+ if (selector != null)
+ {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleRemove(SocketSessionImpl session)
+ {
+ synchronized (removingSessions)
+ {
+ removingSessions.push(session);
+ }
+ }
+
+ private void scheduleFlush(SocketSessionImpl session)
+ {
+ synchronized (flushingSessionsSet)
+ {
+ //if flushingSessions grows to contain Integer.MAX_VALUE sessions
+ // then this will fail.
+ if (flushingSessionsSet.add(session))
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+
+ private void scheduleTrafficControl(SocketSessionImpl session)
+ {
+ synchronized (trafficControllingSessions)
+ {
+ trafficControllingSessions.push(session);
+ }
+ }
+
+ private void doAddNewReader() throws InterruptedException
+ {
+ if (newSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (newSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+
+ try
+ {
+
+ ch.configureBlocking(false);
+ session.setSelectionKey(ch.register(selector,
+ SelectionKey.OP_READ,
+ session));
+
+ //System.out.println("ReadDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch (IOException e)
+ {
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+
+ private void doAddNewWrite() throws InterruptedException
+ {
+ if (newSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (newSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ ch.configureBlocking(false);
+ synchronized (flushingSessionsSet)
+ {
+ flushingSessionsSet.add(session);
+ }
+
+ session.setWriteSelectionKey(ch.register(writeSelector,
+ SelectionKey.OP_WRITE,
+ session));
+
+ //System.out.println("WriteDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch (IOException e)
+ {
+
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+
+ private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ synchronized (newSessions)
+ {
+ if (!session.created())
+ {
+ _logger.debug("Popping new session");
+ newSessions.pop();
+
+ // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+ // in AbstractIoFilterChain.fireSessionOpened().
+ session.getServiceListeners().fireSessionCreated(session);
+
+ session.doneCreation();
+ }
+ }
+ }
+
+ private void doRemove()
+ {
+ if (removingSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (removingSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) removingSessions.pop();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getReadSelectionKey();
+ SelectionKey writeKey = session.getWriteSelectionKey();
+
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.close() is called before addSession() is processed)
+ if (key == null || writeKey == null)
+ {
+ scheduleRemove(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid() || !writeKey.isValid())
+ {
+ continue;
+ }
+
+ try
+ {
+ //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
+ synchronized (readLock)
+ {
+ key.cancel();
+ }
+ synchronized (writeLock)
+ {
+ writeKey.cancel();
+ }
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ releaseWriteBuffers(session);
+ session.getServiceListeners().fireSessionDestroyed(session);
+ }
+ }
+ }
+
+ private void processRead(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
+
+ synchronized (readLock)
+ {
+ if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
+ {
+ read(session);
+ }
+ }
+
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void processWrite(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+
+ synchronized (writeLock)
+ {
+ if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
+ {
+
+ // Clear OP_WRITE
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ synchronized (flushingSessionsSet)
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void read(SocketSessionImpl session)
+ {
+
+ //if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
+ }
+
+ int totalReadBytes = 0;
+
+ while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ buf.clear();
+
+ int readBytes = 0;
+ int ret;
+
+ try
+ {
+ while ((ret = ch.read(buf.buf())) > 0)
+ {
+ readBytes += ret;
+ totalReadBytes += ret;
+ }
+ }
+ finally
+ {
+ buf.flip();
+ }
+
+
+ if (readBytes > 0)
+ {
+ session.increaseReadBytes(readBytes);
+
+ session.getFilterChain().fireMessageReceived(session, buf);
+ buf = null;
+ }
+
+ if (ret <= 0)
+ {
+ if (ret == 0)
+ {
+ if (readBytes == session.getReadBufferSize())
+ {
+ continue;
+ }
+ }
+ else
+ {
+ scheduleRemove(session);
+ }
+
+ break;
+ }
+ }
+ catch (Throwable e)
+ {
+ if (e instanceof IOException)
+ {
+ scheduleRemove(session);
+ }
+ session.getFilterChain().fireExceptionCaught(session, e);
+
+ //Stop Reading this session.
+ return;
+ }
+ finally
+ {
+ if (buf != null)
+ {
+ buf.release();
+ }
+ }
+ }//for
+
+ // if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
+ }
+ }
+
+
+ private void notifyReadIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleReadCheckTime) >= 1000)
+ {
+ lastIdleReadCheckTime = currentTime;
+ Set keys = selector.keys();
+ if (keys != null)
+ {
+ for (Iterator it = keys.iterator(); it.hasNext();)
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+ notifyReadIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyWriteIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleWriteCheckTime) >= 1000)
+ {
+ lastIdleWriteCheckTime = currentTime;
+ Set keys = writeSelector.keys();
+ if (keys != null)
+ {
+ for (Iterator it = keys.iterator(); it.hasNext();)
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+ notifyWriteIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+ IdleStatus.READER_IDLE,
+ Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
+
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+ IdleStatus.WRITER_IDLE,
+ Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime)
+ {
+ if (idleTime > 0 && lastIoTime != 0
+ && (currentTime - lastIoTime) >= idleTime)
+ {
+ session.increaseIdleCount(status);
+ session.getFilterChain().fireSessionIdle(session, status);
+ }
+ }
+
+ private void notifyWriteTimeout(SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime)
+ {
+
+ MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
+ SelectionKey key = sesh.getWriteSelectionKey();
+
+ synchronized (writeLock)
+ {
+ if (writeTimeout > 0
+ && (currentTime - lastIoTime) >= writeTimeout
+ && key != null && key.isValid()
+ && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ {
+ session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
+ }
+ }
+ }
+
+ private SocketSessionImpl getNextFlushingSession()
+ {
+ return (SocketSessionImpl) flushingSessions.poll();
+ }
+
+ private void releaseSession(SocketSessionImpl session)
+ {
+ synchronized (session.getWriteRequestQueue())
+ {
+ synchronized (flushingSessionsSet)
+ {
+ if (session.getScheduledWriteRequests() > 0)
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
+ }
+ flushingSessions.offer(session);
+ }
+ else
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
+ }
+ flushingSessionsSet.remove(session);
+ }
+ }
+ }
+ }
+
+ private void releaseWriteBuffers(SocketSessionImpl session)
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequest req;
+
+ //Should this be synchronized?
+ synchronized (writeRequestQueue)
+ {
+ while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
+ {
+ try
+ {
+ ((ByteBuffer) req.getMessage()).release();
+ }
+ catch (IllegalStateException e)
+ {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ req.getFuture().setWritten(false);
+ }
+ }
+ }
+ }
+
+ private void doFlush()
+ {
+ MultiThreadSocketSessionImpl session;
+
+ while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
+ {
+ if (!session.isConnected())
+ {
+ releaseWriteBuffers(session);
+ releaseSession(session);
+ continue;
+ }
+
+ SelectionKey key = session.getWriteSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession() is processed)
+ if (key == null)
+ {
+ scheduleFlush(session);
+ releaseSession(session);
+ continue;
+ }
+ // skip if channel is already closed
+ if (!key.isValid())
+ {
+ releaseSession(session);
+ continue;
+ }
+
+ try
+ {
+ if (doFlush(session))
+ {
+ releaseSession(session);
+ }
+ }
+ catch (IOException e)
+ {
+ releaseSession(session);
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+
+ }
+
+ }
+
+ private boolean doFlush(SocketSessionImpl sessionParam) throws IOException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ // Clear OP_WRITE
+ SelectionKey key = session.getWriteSelectionKey();
+ synchronized (writeLock)
+ {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+ SocketChannel ch = session.getChannel();
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ long totalFlushedBytes = 0;
+ while (true)
+ {
+ WriteRequest req;
+
+ synchronized (writeRequestQueue)
+ {
+ req = (WriteRequest) writeRequestQueue.first();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0)
+ {
+ synchronized (writeRequestQueue)
+ {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+
+ int writtenBytes = 0;
+
+ // Reported as DIRMINA-362
+ //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+ if (key.isWritable())
+ {
+ writtenBytes = ch.write(buf.buf());
+ totalFlushedBytes += writtenBytes;
+ }
+
+ if (writtenBytes > 0)
+ {
+ session.increaseWrittenBytes(writtenBytes);
+ }
+
+ if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
+ {
+ // Kernel buffer is full
+ synchronized (writeLock)
+ {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ }
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return false;
+ }
+ }
+
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return true;
+ }
+
+ private void doUpdateTrafficMask()
+ {
+ if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
+ {
+ return;
+ }
+
+ // Synchronize over entire operation as this method should be called
+ // from both read and write thread and we don't want the order of the
+ // updates to get changed.
+ trafficMaskUpdateLock.lock();
+ try
+ {
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop();
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SelectionKey key = session.getReadSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if (key == null)
+ {
+ scheduleTrafficControl(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+
+ //Sset to Read and Write if there is nothing then the cost
+ // is one loop through the flusher.
+ int ops = SelectionKey.OP_READ;
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ synchronized (readLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ //Change key to the WriteSelection Key
+ key = session.getWriteSelectionKey();
+ if (key != null && key.isValid())
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized (writeRequestQueue)
+ {
+ if (!writeRequestQueue.isEmpty())
+ {
+ ops = SelectionKey.OP_WRITE;
+ synchronized (writeLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ trafficMaskUpdateLock.unlock();
+ }
+
+ }
+
+ private class WriteWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
+
+ //System.out.println("WriteDebug:"+"Startup");
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
+
+ doAddNewWrite();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0)
+ {
+ //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
+ processWrite(writeSelector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("WriteDebug:"+"No keys from writeselector");
+ }
+
+ doRemove();
+ notifyWriteIdleness();
+
+ if (flushingSessionsSet.size() > 0)
+ {
+ doFlush();
+ }
+
+ if (writeSelector.keys().isEmpty())
+ {
+ synchronized (writeLock)
+ {
+
+ if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
+ {
+ writeWorker = null;
+ try
+ {
+ writeSelector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ writeSelector = null;
+ }
+
+ break;
+ }
+ }
+ }
+
+ }
+ catch (Throwable t)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ //System.out.println("WriteDebug:"+"Shutdown");
+ }
+
+ }
+
+ private class ReadWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
+
+ //System.out.println("ReadDebug:"+"Startup");
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(SELECTOR_TIMEOUT);
+
+ doAddNewReader();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0)
+ {
+ //System.out.println("ReadDebug:"+nKeys + " keys from selector");
+
+ processRead(selector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("ReadDebug:"+"No keys from selector");
+ }
+
+
+ doRemove();
+ notifyReadIdleness();
+
+ if (selector.keys().isEmpty())
+ {
+
+ synchronized (readLock)
+ {
+ if (selector.keys().isEmpty() && newSessions.isEmpty())
+ {
+ readWorker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+
+ break;
+ }
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ //System.out.println("ReadDebug:"+"Shutdown");
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
new file mode 100644
index 0000000000..043d4800b6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * An {@link IoConnectorConfig} for {@link SocketConnector}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl
+{
+ private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+ private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+ private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false;
+ private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false;
+
+ private static boolean DEFAULT_REUSE_ADDRESS;
+ private static int DEFAULT_RECEIVE_BUFFER_SIZE;
+ private static int DEFAULT_SEND_BUFFER_SIZE;
+ private static int DEFAULT_TRAFFIC_CLASS;
+ private static boolean DEFAULT_KEEP_ALIVE;
+ private static boolean DEFAULT_OOB_INLINE;
+ private static int DEFAULT_SO_LINGER;
+ private static boolean DEFAULT_TCP_NO_DELAY;
+
+ static
+ {
+ initialize();
+ }
+
+ private static void initialize()
+ {
+ Socket socket = null;
+
+ socket = new Socket();
+
+ try
+ {
+ DEFAULT_REUSE_ADDRESS = socket.getReuseAddress();
+ DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize();
+ DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize();
+ DEFAULT_KEEP_ALIVE = socket.getKeepAlive();
+ DEFAULT_OOB_INLINE = socket.getOOBInline();
+ DEFAULT_SO_LINGER = socket.getSoLinger();
+ DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay();
+
+ // Check if setReceiveBufferSize is supported.
+ try
+ {
+ socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE);
+ SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+ }
+
+ // Check if setSendBufferSize is supported.
+ try
+ {
+ socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE);
+ SET_SEND_BUFFER_SIZE_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+ }
+
+ // Check if getTrafficClass is supported.
+ try
+ {
+ DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass();
+ GET_TRAFFIC_CLASS_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ GET_TRAFFIC_CLASS_AVAILABLE = false;
+ DEFAULT_TRAFFIC_CLASS = 0;
+ }
+ }
+ catch( SocketException e )
+ {
+ throw new ExceptionInInitializerError(e);
+ }
+ finally
+ {
+ if( socket != null )
+ {
+ try
+ {
+ socket.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ public static boolean isSetReceiveBufferSizeAvailable() {
+ return SET_RECEIVE_BUFFER_SIZE_AVAILABLE;
+ }
+
+ public static boolean isSetSendBufferSizeAvailable() {
+ return SET_SEND_BUFFER_SIZE_AVAILABLE;
+ }
+
+ public static boolean isGetTrafficClassAvailable() {
+ return GET_TRAFFIC_CLASS_AVAILABLE;
+ }
+
+ public static boolean isSetTrafficClassAvailable() {
+ return SET_TRAFFIC_CLASS_AVAILABLE;
+ }
+
+ private boolean reuseAddress = DEFAULT_REUSE_ADDRESS;
+ private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+ private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+ private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+ private boolean keepAlive = DEFAULT_KEEP_ALIVE;
+ private boolean oobInline = DEFAULT_OOB_INLINE;
+ private int soLinger = DEFAULT_SO_LINGER;
+ private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+ /**
+ * Creates a new instance.
+ */
+ MultiThreadSocketSessionConfigImpl()
+ {
+ }
+
+ public boolean isReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress( boolean reuseAddress )
+ {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public int getReceiveBufferSize()
+ {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize( int receiveBufferSize )
+ {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getSendBufferSize()
+ {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize( int sendBufferSize )
+ {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getTrafficClass()
+ {
+ return trafficClass;
+ }
+
+ public void setTrafficClass( int trafficClass )
+ {
+ this.trafficClass = trafficClass;
+ }
+
+ public boolean isKeepAlive()
+ {
+ return keepAlive;
+ }
+
+ public void setKeepAlive( boolean keepAlive )
+ {
+ this.keepAlive = keepAlive;
+ }
+
+ public boolean isOobInline()
+ {
+ return oobInline;
+ }
+
+ public void setOobInline( boolean oobInline )
+ {
+ this.oobInline = oobInline;
+ }
+
+ public int getSoLinger()
+ {
+ return soLinger;
+ }
+
+ public void setSoLinger( int soLinger )
+ {
+ this.soLinger = soLinger;
+ }
+
+ public boolean isTcpNoDelay()
+ {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay( boolean tcpNoDelay )
+ {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
new file mode 100644
index 0000000000..be4a2d289d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
+import org.apache.mina.util.Queue;
+
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An {@link IoSession} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+class MultiThreadSocketSessionImpl extends SocketSessionImpl
+{
+ private final IoService manager;
+ private final IoServiceConfig serviceConfig;
+ private final SocketSessionConfig config = new SessionConfigImpl();
+ private final MultiThreadSocketIoProcessor ioProcessor;
+ private final MultiThreadSocketFilterChain filterChain;
+ private final SocketChannel ch;
+ private final Queue writeRequestQueue;
+ private final IoHandler handler;
+ private final SocketAddress remoteAddress;
+ private final SocketAddress localAddress;
+ private final SocketAddress serviceAddress;
+ private final IoServiceListenerSupport serviceListeners;
+ private SelectionKey readKey, writeKey;
+ private int readBufferSize;
+ private CountDownLatch registeredReadyLatch = new CountDownLatch(2);
+ private AtomicBoolean created = new AtomicBoolean(false);
+
+ /**
+ * Creates a new instance.
+ */
+ MultiThreadSocketSessionImpl( IoService manager,
+ SocketIoProcessor ioProcessor,
+ IoServiceListenerSupport listeners,
+ IoServiceConfig serviceConfig,
+ SocketChannel ch,
+ IoHandler defaultHandler,
+ SocketAddress serviceAddress )
+ {
+ super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress);
+ this.manager = manager;
+ this.serviceListeners = listeners;
+ this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor;
+ this.filterChain = new MultiThreadSocketFilterChain(this);
+ this.ch = ch;
+ this.writeRequestQueue = new Queue();
+ this.handler = defaultHandler;
+ this.remoteAddress = ch.socket().getRemoteSocketAddress();
+ this.localAddress = ch.socket().getLocalSocketAddress();
+ this.serviceAddress = serviceAddress;
+ this.serviceConfig = serviceConfig;
+
+ // Apply the initial session settings
+ IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
+ if( sessionConfig instanceof SocketSessionConfig )
+ {
+ SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig;
+ this.config.setKeepAlive( cfg.isKeepAlive() );
+ this.config.setOobInline( cfg.isOobInline() );
+ this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() );
+ this.readBufferSize = cfg.getReceiveBufferSize();
+ this.config.setReuseAddress( cfg.isReuseAddress() );
+ this.config.setSendBufferSize( cfg.getSendBufferSize() );
+ this.config.setSoLinger( cfg.getSoLinger() );
+ this.config.setTcpNoDelay( cfg.isTcpNoDelay() );
+
+ if( this.config.getTrafficClass() != cfg.getTrafficClass() )
+ {
+ this.config.setTrafficClass( cfg.getTrafficClass() );
+ }
+ }
+ }
+
+ void awaitRegistration() throws InterruptedException
+ {
+ registeredReadyLatch.countDown();
+
+ registeredReadyLatch.await();
+ }
+
+ boolean created() throws InterruptedException
+ {
+ return created.get();
+ }
+
+ void doneCreation()
+ {
+ created.getAndSet(true);
+ }
+
+ public IoService getService()
+ {
+ return manager;
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return serviceConfig;
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return config;
+ }
+
+ SocketIoProcessor getIoProcessor()
+ {
+ return ioProcessor;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filterChain;
+ }
+
+ SocketChannel getChannel()
+ {
+ return ch;
+ }
+
+ IoServiceListenerSupport getServiceListeners()
+ {
+ return serviceListeners;
+ }
+
+ SelectionKey getSelectionKey()
+ {
+ return readKey;
+ }
+
+ SelectionKey getReadSelectionKey()
+ {
+ return readKey;
+ }
+
+ SelectionKey getWriteSelectionKey()
+ {
+ return writeKey;
+ }
+
+ void setSelectionKey(SelectionKey key)
+ {
+ this.readKey = key;
+ }
+
+ void setWriteSelectionKey(SelectionKey key)
+ {
+ this.writeKey = key;
+ }
+
+ public IoHandler getHandler()
+ {
+ return handler;
+ }
+
+ protected void close0()
+ {
+ filterChain.fireFilterClose( this );
+ }
+
+ Queue getWriteRequestQueue()
+ {
+ return writeRequestQueue;
+ }
+
+ /**
+ @return int Number of write scheduled write requests
+ @deprecated
+ */
+ public int getScheduledWriteMessages()
+ {
+ return getScheduledWriteRequests();
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.size();
+ }
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.byteSize();
+ }
+ }
+
+ protected void write0( WriteRequest writeRequest )
+ {
+ filterChain.fireFilterWrite( this, writeRequest );
+ }
+
+ public TransportType getTransportType()
+ {
+ return TransportType.SOCKET;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ //This is what I had previously
+// return ch.socket().getRemoteSocketAddress();
+ return remoteAddress;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ //This is what I had previously
+// return ch.socket().getLocalSocketAddress();
+ return localAddress;
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return serviceAddress;
+ }
+
+ protected void updateTrafficMask()
+ {
+ this.ioProcessor.updateTrafficMask( this );
+ }
+
+ int getReadBufferSize()
+ {
+ return readBufferSize;
+ }
+
+ private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
+ {
+ public boolean isKeepAlive()
+ {
+ try
+ {
+ return ch.socket().getKeepAlive();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setKeepAlive( boolean on )
+ {
+ try
+ {
+ ch.socket().setKeepAlive( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isOobInline()
+ {
+ try
+ {
+ return ch.socket().getOOBInline();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setOobInline( boolean on )
+ {
+ try
+ {
+ ch.socket().setOOBInline( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isReuseAddress()
+ {
+ try
+ {
+ return ch.socket().getReuseAddress();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setReuseAddress( boolean on )
+ {
+ try
+ {
+ ch.socket().setReuseAddress( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public int getSoLinger()
+ {
+ try
+ {
+ return ch.socket().getSoLinger();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setSoLinger( int linger )
+ {
+ try
+ {
+ if( linger < 0 )
+ {
+ ch.socket().setSoLinger( false, 0 );
+ }
+ else
+ {
+ ch.socket().setSoLinger( true, linger );
+ }
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isTcpNoDelay()
+ {
+ try
+ {
+ return ch.socket().getTcpNoDelay();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setTcpNoDelay( boolean on )
+ {
+ try
+ {
+ ch.socket().setTcpNoDelay( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public int getTrafficClass()
+ {
+ if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
+ {
+ try
+ {
+ return ch.socket().getTrafficClass();
+ }
+ catch( SocketException e )
+ {
+ // Throw an exception only when setTrafficClass is also available.
+ if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ public void setTrafficClass( int tc )
+ {
+ if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+ {
+ try
+ {
+ ch.socket().setTrafficClass( tc );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ public int getSendBufferSize()
+ {
+ try
+ {
+ return ch.socket().getSendBufferSize();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setSendBufferSize( int size )
+ {
+ if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setSendBufferSize( size );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ public int getReceiveBufferSize()
+ {
+ try
+ {
+ return ch.socket().getReceiveBufferSize();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setReceiveBufferSize( int size )
+ {
+ if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setReceiveBufferSize( size );
+ MultiThreadSocketSessionImpl.this.readBufferSize = size;
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
new file mode 100644
index 0000000000..a23e546af5
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.transport.vmpipe.support.VmPipe;
+import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker;
+import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl;
+import org.apache.mina.util.AnonymousSocketAddress;
+
+/**
+ * Connects to {@link IoHandler}s which is bound on the specified
+ * {@link VmPipeAddress}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class QpidVmPipeConnector extends VmPipeConnector
+{
+ private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
+ private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig()
+ {
+ public IoSessionConfig getSessionConfig()
+ {
+ return CONFIG;
+ }
+ };
+
+ /**
+ * Creates a new instance.
+ */
+ public QpidVmPipeConnector()
+ {
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ return connect( address, null, handler, config );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+ if( ! ( address instanceof VmPipeAddress ) )
+ throw new IllegalArgumentException(
+ "address must be VmPipeAddress." );
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address );
+ if( entry == null )
+ {
+ return DefaultConnectFuture.newFailedFuture(
+ new IOException( "Endpoint unavailable: " + address ) );
+ }
+
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ VmPipeSessionImpl localSession =
+ new VmPipeSessionImpl(
+ this,
+ config,
+ getListeners(),
+ new Object(), // lock
+ new AnonymousSocketAddress(),
+ handler,
+ entry );
+
+ // initialize acceptor session
+ VmPipeSessionImpl remoteSession = localSession.getRemoteSession();
+ try
+ {
+ IoFilterChain filterChain = remoteSession.getFilterChain();
+ entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ entry.getListeners().fireSessionCreated( remoteSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ remoteSession.close();
+ }
+
+
+ // initialize connector session
+ try
+ {
+ IoFilterChain filterChain = localSession.getFilterChain();
+ this.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future );
+ getListeners().fireSessionCreated( localSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( localSession);
+ }
+ catch( Throwable t )
+ {
+ future.setException( t );
+ }
+
+
+
+ return future;
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+} \ No newline at end of file