diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 16:21:34 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-14 16:21:34 +0000 |
commit | d84a3a50dbb794c4383de7e5eca730ca602771e7 (patch) | |
tree | 7c6177573a2eedc172de2cbd8354ce7b4ea1e8fe /qpid/java/common/src/main/java/org/apache | |
parent | 0aba202a7e2483f04fc77bbe4faa88bb86fe5b9b (diff) | |
parent | 47551f3aa2dd46b8daeeb9683a668464203ffa06 (diff) | |
download | qpid-python-d84a3a50dbb794c4383de7e5eca730ca602771e7.tar.gz |
Create sandbox from correct revision
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1157557 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache')
62 files changed, 7345 insertions, 1151 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java new file mode 100644 index 0000000000..0c311b6645 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -0,0 +1,467 @@ +package org.apache.mina.common; + +import org.apache.mina.common.ByteBuffer; + +import java.nio.*; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class FixedSizeByteBufferAllocator implements ByteBufferAllocator +{ + + + private static final int MINIMUM_CAPACITY = 1; + + public FixedSizeByteBufferAllocator () + { + } + + public ByteBuffer allocate( int capacity, boolean direct ) + { + java.nio.ByteBuffer nioBuffer; + if( direct ) + { + nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity ); + } + else + { + nioBuffer = java.nio.ByteBuffer.allocate( capacity ); + } + return new FixedSizeByteBuffer( nioBuffer ); + } + + public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer ) + { + return new FixedSizeByteBuffer( nioBuffer ); + } + + public void dispose() + { + } + + + + private static final class FixedSizeByteBuffer extends ByteBuffer + { + private java.nio.ByteBuffer buf; + private int mark = -1; + + + protected FixedSizeByteBuffer( java.nio.ByteBuffer buf ) + { + this.buf = buf; + buf.order( ByteOrder.BIG_ENDIAN ); + } + + public synchronized void acquire() + { + } + + public void release() + { + } + + public java.nio.ByteBuffer buf() + { + return buf; + } + + public boolean isPooled() + { + return false; + } + + public void setPooled( boolean pooled ) + { + } + + public ByteBuffer duplicate() { + return new FixedSizeByteBuffer( this.buf.duplicate() ); + } + + public ByteBuffer slice() { + return new FixedSizeByteBuffer( this.buf.slice() ); + } + + public ByteBuffer asReadOnlyBuffer() { + return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() ); + } + + public byte[] array() + { + return buf.array(); + } + + public int arrayOffset() + { + return buf.arrayOffset(); + } + + public boolean isDirect() + { + return buf.isDirect(); + } + + public boolean isReadOnly() + { + return buf.isReadOnly(); + } + + public int capacity() + { + return buf.capacity(); + } + + public ByteBuffer capacity( int newCapacity ) + { + if( newCapacity > capacity() ) + { + throw new IllegalArgumentException(); + } + + return this; + } + + + + public boolean isAutoExpand() + { + return false; + } + + public ByteBuffer setAutoExpand( boolean autoExpand ) + { + if(autoExpand) throw new IllegalArgumentException(); + else return this; + } + + public ByteBuffer expand( int pos, int expectedRemaining ) + { + int end = pos + expectedRemaining; + if( end > capacity() ) + { + // The buffer needs expansion. + capacity( end ); + } + + if( end > limit() ) + { + // We call limit() directly to prevent StackOverflowError + buf.limit( end ); + } + return this; + } + + public int position() + { + return buf.position(); + } + + public ByteBuffer position( int newPosition ) + { + + buf.position( newPosition ); + if( mark > newPosition ) + { + mark = -1; + } + return this; + } + + public int limit() + { + return buf.limit(); + } + + public ByteBuffer limit( int newLimit ) + { + buf.limit( newLimit ); + if( mark > newLimit ) + { + mark = -1; + } + return this; + } + + public ByteBuffer mark() + { + buf.mark(); + mark = position(); + return this; + } + + public int markValue() + { + return mark; + } + + public ByteBuffer reset() + { + buf.reset(); + return this; + } + + public ByteBuffer clear() + { + buf.clear(); + mark = -1; + return this; + } + + public ByteBuffer flip() + { + buf.flip(); + mark = -1; + return this; + } + + public ByteBuffer rewind() + { + buf.rewind(); + mark = -1; + return this; + } + + public byte get() + { + return buf.get(); + } + + public ByteBuffer put( byte b ) + { + buf.put( b ); + return this; + } + + public byte get( int index ) + { + return buf.get( index ); + } + + public ByteBuffer put( int index, byte b ) + { + buf.put( index, b ); + return this; + } + + public ByteBuffer get( byte[] dst, int offset, int length ) + { + buf.get( dst, offset, length ); + return this; + } + + public ByteBuffer put( java.nio.ByteBuffer src ) + { + buf.put( src ); + return this; + } + + public ByteBuffer put( byte[] src, int offset, int length ) + { + buf.put( src, offset, length ); + return this; + } + + public ByteBuffer compact() + { + buf.compact(); + mark = -1; + return this; + } + + public ByteOrder order() + { + return buf.order(); + } + + public ByteBuffer order( ByteOrder bo ) + { + buf.order( bo ); + return this; + } + + public char getChar() + { + return buf.getChar(); + } + + public ByteBuffer putChar( char value ) + { + buf.putChar( value ); + return this; + } + + public char getChar( int index ) + { + return buf.getChar( index ); + } + + public ByteBuffer putChar( int index, char value ) + { + buf.putChar( index, value ); + return this; + } + + public CharBuffer asCharBuffer() + { + return buf.asCharBuffer(); + } + + public short getShort() + { + return buf.getShort(); + } + + public ByteBuffer putShort( short value ) + { + buf.putShort( value ); + return this; + } + + public short getShort( int index ) + { + return buf.getShort( index ); + } + + public ByteBuffer putShort( int index, short value ) + { + buf.putShort( index, value ); + return this; + } + + public ShortBuffer asShortBuffer() + { + return buf.asShortBuffer(); + } + + public int getInt() + { + return buf.getInt(); + } + + public ByteBuffer putInt( int value ) + { + buf.putInt( value ); + return this; + } + + public int getInt( int index ) + { + return buf.getInt( index ); + } + + public ByteBuffer putInt( int index, int value ) + { + buf.putInt( index, value ); + return this; + } + + public IntBuffer asIntBuffer() + { + return buf.asIntBuffer(); + } + + public long getLong() + { + return buf.getLong(); + } + + public ByteBuffer putLong( long value ) + { + buf.putLong( value ); + return this; + } + + public long getLong( int index ) + { + return buf.getLong( index ); + } + + public ByteBuffer putLong( int index, long value ) + { + buf.putLong( index, value ); + return this; + } + + public LongBuffer asLongBuffer() + { + return buf.asLongBuffer(); + } + + public float getFloat() + { + return buf.getFloat(); + } + + public ByteBuffer putFloat( float value ) + { + buf.putFloat( value ); + return this; + } + + public float getFloat( int index ) + { + return buf.getFloat( index ); + } + + public ByteBuffer putFloat( int index, float value ) + { + buf.putFloat( index, value ); + return this; + } + + public FloatBuffer asFloatBuffer() + { + return buf.asFloatBuffer(); + } + + public double getDouble() + { + return buf.getDouble(); + } + + public ByteBuffer putDouble( double value ) + { + buf.putDouble( value ); + return this; + } + + public double getDouble( int index ) + { + return buf.getDouble( index ); + } + + public ByteBuffer putDouble( int index, double value ) + { + buf.putDouble( index, value ); + return this; + } + + public DoubleBuffer asDoubleBuffer() + { + return buf.asDoubleBuffer(); + } + + + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java new file mode 100644 index 0000000000..4fd28c4eb5 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.common.support; + +import org.apache.mina.common.IoFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFutureListener; + +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; + +/** + * A default implementation of {@link org.apache.mina.common.IoFuture}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + */ +public class DefaultIoFuture implements IoFuture +{ + private final IoSession session; + private final Object lock; + private List listeners; + private Object result; + private boolean ready; + + + /** + * Creates a new instance. + * + * @param session an {@link IoSession} which is associated with this future + */ + public DefaultIoFuture( IoSession session ) + { + this.session = session; + this.lock = this; + } + + /** + * Creates a new instance which uses the specified object as a lock. + */ + public DefaultIoFuture( IoSession session, Object lock ) + { + if( lock == null ) + { + throw new NullPointerException( "lock" ); + } + this.session = session; + this.lock = lock; + } + + public IoSession getSession() + { + return session; + } + + public Object getLock() + { + return lock; + } + + public void join() + { + synchronized( lock ) + { + while( !ready ) + { + try + { + lock.wait(); + } + catch( InterruptedException e ) + { + } + } + } + } + + public boolean join( long timeoutInMillis ) + { + long startTime = ( timeoutInMillis <= 0 ) ? 0 : System + .currentTimeMillis(); + long waitTime = timeoutInMillis; + + synchronized( lock ) + { + if( ready ) + { + return ready; + } + else if( waitTime <= 0 ) + { + return ready; + } + + for( ;; ) + { + try + { + lock.wait( waitTime ); + } + catch( InterruptedException e ) + { + } + + if( ready ) + return true; + else + { + waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime ); + if( waitTime <= 0 ) + { + return ready; + } + } + } + } + } + + public boolean isReady() + { + synchronized( lock ) + { + return ready; + } + } + + /** + * Sets the result of the asynchronous operation, and mark it as finished. + */ + protected void setValue( Object newValue ) + { + synchronized( lock ) + { + // Allow only once. + if( ready ) + { + return; + } + + result = newValue; + ready = true; + lock.notifyAll(); + + notifyListeners(); + } + } + + /** + * Returns the result of the asynchronous operation. + */ + protected Object getValue() + { + synchronized( lock ) + { + return result; + } + } + + public void addListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + if(listeners == null) + { + listeners = new ArrayList(); + } + listeners.add( listener ); + if( ready ) + { + listener.operationComplete( this ); + } + } + } + + public void removeListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + listeners.remove( listener ); + } + } + + private void notifyListeners() + { + synchronized( lock ) + { + + if(listeners != null) + { + + for( Iterator i = listeners.iterator(); i.hasNext(); ) { + ( ( IoFutureListener ) i.next() ).operationComplete( this ); + } + } + } + } +} + + + diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java b/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java new file mode 100644 index 0000000000..5723ffbaa9 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.common.support; + +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import org.apache.mina.common.IoAcceptorConfig; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFuture; +import org.apache.mina.common.IoFutureListener; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoService; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoServiceListener; +import org.apache.mina.common.IoSession; +import org.apache.mina.util.IdentityHashSet; + +/** + * A helper which provides addition and removal of {@link IoServiceListener}s and firing + * events. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 446526 $, $Date: 2006-09-15 01:44:11 -0400 (Fri, 15 Sep 2006) $ + */ +public class IoServiceListenerSupport +{ + /** + * A list of {@link IoServiceListener}s. + */ + private final List listeners = new ArrayList(); + + /** + * Tracks managed <tt>serviceAddress</tt>es. + */ + private final Set managedServiceAddresses = new HashSet(); + + /** + * Tracks managed sesssions with <tt>serviceAddress</tt> as a key. + */ + private final Map managedSessions = new HashMap(); + + /** + * Creates a new instance. + */ + public IoServiceListenerSupport() + { + } + + /** + * Adds a new listener. + */ + public void add( IoServiceListener listener ) + { + synchronized( listeners ) + { + listeners.add( listener ); + } + } + + /** + * Removes an existing listener. + */ + public void remove( IoServiceListener listener ) + { + synchronized( listeners ) + { + listeners.remove( listener ); + } + } + + public Set getManagedServiceAddresses() + { + return Collections.unmodifiableSet( managedServiceAddresses ); + } + + public boolean isManaged( SocketAddress serviceAddress ) + { + synchronized( managedServiceAddresses ) + { + return managedServiceAddresses.contains( serviceAddress ); + } + } + + public Set getManagedSessions( SocketAddress serviceAddress ) + { + Set sessions; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + if( sessions == null ) + { + sessions = new IdentityHashSet(); + } + } + + synchronized( sessions ) + { + return new IdentityHashSet( sessions ); + } + } + + /** + * Calls {@link IoServiceListener#serviceActivated(IoService, SocketAddress, IoHandler, IoServiceConfig)} + * for all registered listeners. + */ + public void fireServiceActivated( + IoService service, SocketAddress serviceAddress, + IoHandler handler, IoServiceConfig config ) + { + synchronized( managedServiceAddresses ) + { + if( !managedServiceAddresses.add( serviceAddress ) ) + { + return; + } + } + + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).serviceActivated( + service, serviceAddress, handler, config ); + } + } + } + + /** + * Calls {@link IoServiceListener#serviceDeactivated(IoService, SocketAddress, IoHandler, IoServiceConfig)} + * for all registered listeners. + */ + public synchronized void fireServiceDeactivated( + IoService service, SocketAddress serviceAddress, + IoHandler handler, IoServiceConfig config ) + { + synchronized( managedServiceAddresses ) + { + if( !managedServiceAddresses.remove( serviceAddress ) ) + { + return; + } + } + + try + { + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).serviceDeactivated( + service, serviceAddress, handler, config ); + } + } + } + finally + { + disconnectSessions( serviceAddress, config ); + } + } + + + /** + * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners. + */ + public void fireSessionCreated( IoSession session ) + { + SocketAddress serviceAddress = session.getServiceAddress(); + + // Get the session set. + boolean firstSession = false; + Set sessions; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + if( sessions == null ) + { + sessions = new IdentityHashSet(); + managedSessions.put( serviceAddress, sessions ); + firstSession = true; + } + } + + // If already registered, ignore. + synchronized( sessions ) + { + if ( !sessions.add( session ) ) + { + return; + } + } + + // If the first connector session, fire a virtual service activation event. + if( session.getService() instanceof IoConnector && firstSession ) + { + fireServiceActivated( + session.getService(), session.getServiceAddress(), + session.getHandler(), session.getServiceConfig() ); + } + + // Fire session events. + session.getFilterChain().fireSessionCreated( session ); + session.getFilterChain().fireSessionOpened( session); + + // Fire listener events. + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).sessionCreated( session ); + } + } + } + + /** + * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners. + */ + public void fireSessionDestroyed( IoSession session ) + { + SocketAddress serviceAddress = session.getServiceAddress(); + + // Get the session set. + Set sessions; + boolean lastSession = false; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + // Ignore if unknown. + if( sessions == null ) + { + return; + } + + // Try to remove the remaining empty seession set after removal. + synchronized( sessions ) + { + sessions.remove( session ); + if( sessions.isEmpty() ) + { + managedSessions.remove( serviceAddress ); + lastSession = true; + } + } + } + + // Fire session events. + session.getFilterChain().fireSessionClosed( session ); + + // Fire listener events. + try + { + synchronized( listeners ) + { + for( Iterator i = listeners.iterator(); i.hasNext(); ) + { + ( ( IoServiceListener ) i.next() ).sessionDestroyed( session ); + } + } + } + finally + { + // Fire a virtual service deactivation event for the last session of the connector. + //TODO double-check that this is *STILL* the last session. May not be the case + if( session.getService() instanceof IoConnector && lastSession ) + { + fireServiceDeactivated( + session.getService(), session.getServiceAddress(), + session.getHandler(), session.getServiceConfig() ); + } + } + } + + private void disconnectSessions( SocketAddress serviceAddress, IoServiceConfig config ) + { + if( !( config instanceof IoAcceptorConfig ) ) + { + return; + } + + if( !( ( IoAcceptorConfig ) config ).isDisconnectOnUnbind() ) + { + return; + } + + Set sessions; + synchronized( managedSessions ) + { + sessions = ( Set ) managedSessions.get( serviceAddress ); + } + + if( sessions == null ) + { + return; + } + + Set sessionsCopy; + + // Create a copy to avoid ConcurrentModificationException + synchronized( sessions ) + { + sessionsCopy = new IdentityHashSet( sessions ); + } + + final CountDownLatch latch = new CountDownLatch(sessionsCopy.size()); + + for( Iterator i = sessionsCopy.iterator(); i.hasNext(); ) + { + ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener() + { + public void operationComplete( IoFuture future ) + { + latch.countDown(); + } + } ); + } + + try + { + latch.await(); + } + catch( InterruptedException ie ) + { + // Ignored + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java index 1f69973b96..47f19aa76d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java @@ -1,4 +1,6 @@ -/* +package org.apache.mina.filter; + +import org.apache.mina.common.IoFilter;/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,30 +20,29 @@ * under the License. * */ -package org.apache.qpid.transport.network; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Sender; -public interface NetworkConnection +public class WriteBufferFullExeception extends RuntimeException { - Sender<ByteBuffer> getSender(); + private IoFilter.WriteRequest _writeRequest; - void close(); + public WriteBufferFullExeception() + { + this(null); + } - /** - * Returns the remote address of the underlying socket. - */ - SocketAddress getRemoteAddress(); + public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest) + { + _writeRequest = writeRequest; + } - /** - * Returns the local address of the underlying socket. - */ - SocketAddress getLocalAddress(); - void setMaxWriteIdle(int sec); + public void setWriteRequest(IoFilter.WriteRequest writeRequest) + { + _writeRequest = writeRequest; + } - void setMaxReadIdle(int sec); -}
\ No newline at end of file + public IoFilter.WriteRequest getWriteRequest() + { + return _writeRequest; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java new file mode 100644 index 0000000000..4e9db9071a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.DefaultIoFilterChainBuilder; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.executor.ExecutorFilter; + +import java.util.Iterator; +import java.util.List; + +/** + * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than + * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the + * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to + * cause an Out of Memory exception due to a back log of unprocessed messages. + * + * This is should only be viewed as a temporary work around for DIRMINA-302. + * + * A true solution should not be implemented as a filter as this issue will always occur. On a machine + * where the network is slower than the local producer. + * + * Suggested improvement is to allow implementation of policices on what to do when buffer is full. + * + * They could be: + * Block - As this does + * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks + * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state + * + * <p/> + * <p>Usage: + * <p/> + * <pre><code> + * DefaultFilterChainBuilder builder = ... + * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); + * filter.attach( builder ); + * </code></pre> + * <p/> + * or + * <p/> + * <pre><code> + * IoFilterChain chain = ... + * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); + * filter.attach( chain ); + * </code></pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class WriteBufferLimitFilterBuilder +{ + public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize"; + + private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000; + + private volatile boolean throwNotBlock = false; + + private volatile int maximumConnectionBufferCount; + private volatile long maximumConnectionBufferSize; + + private final Object _blockLock = new Object(); + + private int _blockWaiters = 0; + + + public WriteBufferLimitFilterBuilder() + { + this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT); + } + + public WriteBufferLimitFilterBuilder(int maxWriteBufferSize) + { + setMaximumConnectionBufferCount(maxWriteBufferSize); + } + + + /** + * Set the maximum amount pending items in the writeQueue for a given session. + * Changing the value will only take effect when new data is received for a + * connection, including existing connections. Default value is 5000 msgs. + * + * @param maximumConnectionBufferCount New buffer size. Must be > 0 + */ + public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount) + { + this.maximumConnectionBufferCount = maximumConnectionBufferCount; + this.maximumConnectionBufferSize = 0; + } + + public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize) + { + this.maximumConnectionBufferSize = maximumConnectionBufferSize; + this.maximumConnectionBufferCount = 0; + } + + /** + * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself + * before and after that filter. + * + * @param chain {@link IoFilterChain} to attach self to. + */ + public void attach(IoFilterChain chain) + { + String name = getThreadPoolFilterEntryName(chain.getAll()); + + chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); + } + + /** + * Attach this filter to the specified builder. It will search for the + * {@link ExecutorFilter}, and attach itself before and after that filter. + * + * @param builder {@link DefaultIoFilterChainBuilder} to attach self to. + */ + public void attach(DefaultIoFilterChainBuilder builder) + { + String name = getThreadPoolFilterEntryName(builder.getAll()); + + builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); + } + + private String getThreadPoolFilterEntryName(List entries) + { + Iterator i = entries.iterator(); + + while (i.hasNext()) + { + IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next(); + + if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class)) + { + return entry.getName(); + } + } + + throw new IllegalStateException("Chain does not contain a ExecutorFilter"); + } + + + public class SendLimit extends IoFilterAdapter + { + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception + { + try + { + waitTillSendAllowed(session); + } + catch (WriteBufferFullExeception wbfe) + { + nextFilter.exceptionCaught(session, wbfe); + } + + if (writeRequest.getMessage() instanceof ByteBuffer) + { + increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage()); + } + + nextFilter.filterWrite(session, writeRequest); + } + + private void increasePendingWriteSize(IoSession session, ByteBuffer message) + { + synchronized (session) + { + Long pendingSize = getScheduledWriteBytes(session) + message.remaining(); + session.setAttribute(PENDING_SIZE, pendingSize); + } + } + + private boolean sendAllowed(IoSession session) + { + if (session.isClosing()) + { + return true; + } + + int lmswm = maximumConnectionBufferCount; + long lmswb = maximumConnectionBufferSize; + + return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm) + && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb); + } + + private long getScheduledWriteBytes(IoSession session) + { + synchronized (session) + { + Long i = (Long) session.getAttribute(PENDING_SIZE); + return null == i ? 0 : i; + } + } + + private void waitTillSendAllowed(IoSession session) + { + synchronized (_blockLock) + { + if (throwNotBlock) + { + throw new WriteBufferFullExeception(); + } + + _blockWaiters++; + + while (!sendAllowed(session)) + { + try + { + _blockLock.wait(); + } + catch (InterruptedException e) + { + // Ignore. + } + } + _blockWaiters--; + } + } + + public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception + { + if (message instanceof ByteBuffer) + { + decrementPendingWriteSize(session, (ByteBuffer) message); + } + notifyWaitingWriters(); + nextFilter.messageSent(session, message); + } + + private void decrementPendingWriteSize(IoSession session, ByteBuffer message) + { + synchronized (session) + { + session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining()); + } + } + + private void notifyWaitingWriters() + { + synchronized (_blockLock) + { + if (_blockWaiters != 0) + { + _blockLock.notifyAll(); + } + } + + } + + }//SentLimit + + +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java new file mode 100644 index 0000000000..3f7e206cb4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.codec; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; + +/** + * A {@link ProtocolDecoder} that cumulates the content of received + * buffers to a <em>cumulative buffer</em> to help users implement decoders. + * <p> + * If the received {@link ByteBuffer} is only a part of a message. + * decoders should cumulate received buffers to make a message complete or + * to postpone decoding until more buffers arrive. + * <p> + * Here is an example decoder that decodes CRLF terminated lines into + * <code>Command</code> objects: + * <pre> + * public class CRLFTerminatedCommandLineDecoder + * extends CumulativeProtocolDecoder { + * + * private Command parseCommand(ByteBuffer in) { + * // Convert the bytes in the specified buffer to a + * // Command object. + * ... + * } + * + * protected boolean doDecode(IoSession session, ByteBuffer in, + * ProtocolDecoderOutput out) + * throws Exception { + * + * // Remember the initial position. + * int start = in.position(); + * + * // Now find the first CRLF in the buffer. + * byte previous = 0; + * while (in.hasRemaining()) { + * byte current = in.get(); + * + * if (previous == '\r' && current == '\n') { + * // Remember the current position and limit. + * int position = in.position(); + * int limit = in.limit(); + * try { + * in.position(start); + * in.limit(position); + * // The bytes between in.position() and in.limit() + * // now contain a full CRLF terminated line. + * out.write(parseCommand(in.slice())); + * } finally { + * // Set the position to point right after the + * // detected line and set the limit to the old + * // one. + * in.position(position); + * in.limit(limit); + * } + * // Decoded one line; CumulativeProtocolDecoder will + * // call me again until I return false. So just + * // return true until there are no more lines in the + * // buffer. + * return true; + * } + * + * previous = current; + * } + * + * // Could not find CRLF in the buffer. Reset the initial + * // position to the one we recorded above. + * in.position(start); + * + * return false; + * } + * } + * </pre> + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter { + + private static final String BUFFER = OurCumulativeProtocolDecoder.class + .getName() + + ".Buffer"; + + /** + * Creates a new instance. + */ + protected OurCumulativeProtocolDecoder() { + } + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is NOT compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception { + boolean usingSessionBuffer = true; + ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER); + // If we have a session buffer, append data to that; otherwise + // use the buffer read from the network directly. + if (buf != null) { + buf.put(in); + buf.flip(); + } else { + buf = in; + usingSessionBuffer = false; + } + + for (;;) { + int oldPos = buf.position(); + boolean decoded = doDecode(session, buf, out); + if (decoded) { + if (buf.position() == oldPos) { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed."); + } + + if (!buf.hasRemaining()) { + break; + } + } else { + break; + } + } + + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if (buf.hasRemaining()) { + storeRemainingInSession(buf, session); + } else { + if (usingSessionBuffer) + removeSessionBuffer(session); + } + } + + /** + * Implement this method to consume the specified cumulative buffer and + * decode its content into message(s). + * + * @param in the cumulative buffer + * @return <tt>true</tt> if and only if there's more to decode in the buffer + * and you want to have <tt>doDecode</tt> method invoked again. + * Return <tt>false</tt> if remaining data is not enough to decode, + * then this method will be invoked again when more data is cumulated. + * @throws Exception if cannot decode <tt>in</tt>. + */ + protected abstract boolean doDecode(IoSession session, ByteBuffer in, + ProtocolDecoderOutput out) throws Exception; + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose(IoSession session) throws Exception { + removeSessionBuffer(session); + } + + private void removeSessionBuffer(IoSession session) { + ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER); + if (buf != null) { + buf.release(); + } + } + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) { + ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity()); + remainingBuf.setAutoExpand(true); + remainingBuf.order(buf.order()); + remainingBuf.put(buf); + session.setAttribute(BUFFER, remainingBuf); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java new file mode 100644 index 0000000000..b8c6f29720 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java @@ -0,0 +1,440 @@ +package org.apache.mina.filter.codec; + + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +import org.apache.mina.common.*; +import org.apache.mina.common.support.DefaultWriteFuture; +import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; +import org.apache.mina.util.SessionLog; +import org.apache.mina.util.Queue; + + +public class QpidProtocolCodecFilter extends IoFilterAdapter +{ + public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder"; + public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder"; + + private static final Class[] EMPTY_PARAMS = new Class[0]; + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] ); + + private final ProtocolCodecFactory factory; + + public QpidProtocolCodecFilter( ProtocolCodecFactory factory ) + { + if( factory == null ) + { + throw new NullPointerException( "factory" ); + } + this.factory = factory; + } + + public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder ) + { + if( encoder == null ) + { + throw new NullPointerException( "encoder" ); + } + if( decoder == null ) + { + throw new NullPointerException( "decoder" ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() + { + return encoder; + } + + public ProtocolDecoder getDecoder() + { + return decoder; + } + }; + } + + public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass ) + { + if( encoderClass == null ) + { + throw new NullPointerException( "encoderClass" ); + } + if( decoderClass == null ) + { + throw new NullPointerException( "decoderClass" ); + } + if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) ) + { + throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() ); + } + if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) ) + { + throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() ); + } + try + { + encoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." ); + } + try + { + decoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() throws Exception + { + return ( ProtocolEncoder ) encoderClass.newInstance(); + } + + public ProtocolDecoder getDecoder() throws Exception + { + return ( ProtocolDecoder ) decoderClass.newInstance(); + } + }; + } + + public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception + { + if( parent.contains( ProtocolCodecFilter.class ) ) + { + throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." ); + } + } + + public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( !( message instanceof ByteBuffer ) ) + { + nextFilter.messageReceived( session, message ); + return; + } + + ByteBuffer in = ( ByteBuffer ) message; + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + + try + { + decoder.decode( session, in, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + pde.setHexdump( in.getHexDump() ); + throw pde; + } + finally + { + // Dispose the decoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeDecoder( session ); + } + + // Release the read buffer. + in.release(); + + decoderOut.flush(); + } + } + + public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( message instanceof HiddenByteBuffer ) + { + return; + } + + if( !( message instanceof MessageByteBuffer ) ) + { + nextFilter.messageSent( session, message ); + return; + } + + nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message ); + } + + public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception + { + Object message = writeRequest.getMessage(); + if( message instanceof ByteBuffer ) + { + nextFilter.filterWrite( session, writeRequest ); + return; + } + + ProtocolEncoder encoder = getEncoder( session ); + ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest ); + + try + { + encoder.encode( session, message, encoderOut ); + encoderOut.flush(); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + new MessageByteBuffer( writeRequest.getMessage() ), + writeRequest.getFuture(), writeRequest.getDestination() ) ); + } + catch( Throwable t ) + { + ProtocolEncoderException pee; + if( t instanceof ProtocolEncoderException ) + { + pee = ( ProtocolEncoderException ) t; + } + else + { + pee = new ProtocolEncoderException( t ); + } + throw pee; + } + finally + { + // Dispose the encoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeEncoder( session ); + } + } + } + + public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception + { + // Call finishDecode() first when a connection is closed. + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + try + { + decoder.finishDecode( session, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + throw pde; + } + finally + { + // Dispose all. + disposeEncoder( session ); + disposeDecoder( session ); + + decoderOut.flush(); + } + + nextFilter.sessionClosed( session ); + } + + private ProtocolEncoder getEncoder( IoSession session ) throws Exception + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER ); + if( encoder == null ) + { + encoder = factory.getEncoder(); + session.setAttribute( ENCODER, encoder ); + } + return encoder; + } + + private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest ); + } + + private ProtocolDecoder getDecoder( IoSession session ) throws Exception + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER ); + if( decoder == null ) + { + decoder = factory.getDecoder(); + session.setAttribute( DECODER, decoder ); + } + return decoder; + } + + private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter ) + { + return new SimpleProtocolDecoderOutput( session, nextFilter ); + } + + private void disposeEncoder( IoSession session ) + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER ); + if( encoder == null ) + { + return; + } + + try + { + encoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Failed to dispose: " + encoder.getClass().getName() + + " (" + encoder + ')' ); + } + } + + private void disposeDecoder( IoSession session ) + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER ); + if( decoder == null ) + { + return; + } + + try + { + decoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Falied to dispose: " + decoder.getClass().getName() + + " (" + decoder + ')' ); + } + } + + private static class HiddenByteBuffer extends ByteBufferProxy + { + private HiddenByteBuffer( ByteBuffer buf ) + { + super( buf ); + } + } + + private static class MessageByteBuffer extends ByteBufferProxy + { + private final Object message; + + private MessageByteBuffer( Object message ) + { + super( EMPTY_BUFFER ); + this.message = message; + } + + public void acquire() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + + public void release() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + } + + private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput + { + private ByteBuffer buffer; + + private final IoSession session; + private final IoFilter.NextFilter nextFilter; + private final IoFilter.WriteRequest writeRequest; + + public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + this.session = session; + this.nextFilter = nextFilter; + this.writeRequest = writeRequest; + } + + + + public void write( ByteBuffer buf ) + { + if(buffer != null) + { + flush(); + } + buffer = buf; + } + + public void mergeAll() + { + } + + public WriteFuture flush() + { + WriteFuture future = null; + if( buffer == null ) + { + return null; + } + else + { + ByteBuffer buf = buffer; + // Flush only when the buffer has remaining. + if( buf.hasRemaining() ) + { + future = doFlush( buf ); + } + + } + + return future; + } + + + protected WriteFuture doFlush( ByteBuffer buf ) + { + WriteFuture future = new DefaultWriteFuture( session ); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + buf, + future, writeRequest.getDestination() ) ); + return future; + } + } +} + diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java new file mode 100644 index 0000000000..e5360d32e0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.BaseIoAcceptor; +import org.apache.mina.util.Queue; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.NamePreservingRunnable; +import edu.emory.mathcs.backport.java.util.concurrent.Executor; + +/** + * {@link IoAcceptor} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class MultiThreadSocketAcceptor extends SocketAcceptor +{ + /** + * @noinspection StaticNonFinalField + */ + private static volatile int nextId = 0; + + private final Executor executor; + private final Object lock = new Object(); + private final int id = nextId ++; + private final String threadName = "SocketAcceptor-" + id; + private final Map channels = new HashMap(); + + private final Queue registerQueue = new Queue(); + private final Queue cancelQueue = new Queue(); + + private final MultiThreadSocketIoProcessor[] ioProcessors; + private final int processorCount; + + /** + * @noinspection FieldAccessedSynchronizedAndUnsynchronized + */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + + /** + * Create an acceptor with a single processing thread using a NewThreadExecutor + */ + public MultiThreadSocketAcceptor() + { + this( 1, new NewThreadExecutor() ); + } + + /** + * Create an acceptor with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public MultiThreadSocketAcceptor( int processorCount, Executor executor ) + { + if( processorCount < 1 ) + { + throw new IllegalArgumentException( "Must have at least one processor" ); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; + + for( int i = 0; i < processorCount; i++ ) + { + ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor ); + } + } + + + /** + * Binds to the specified <code>address</code> and handles incoming connections with the specified + * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. + * + * @throws IOException if failed to bind + */ + public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException + { + if( handler == null ) + { + throw new NullPointerException( "handler" ); + } + + if( address != null && !( address instanceof InetSocketAddress ) ) + { + throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); + } + + if( config == null ) + { + config = getDefaultConfig(); + } + + RegistrationRequest request = new RegistrationRequest( address, handler, config ); + + synchronized( registerQueue ) + { + registerQueue.push( request ); + } + + startupWorker(); + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + if( request.exception != null ) + { + throw request.exception; + } + } + + + private synchronized void startupWorker() throws IOException + { + synchronized( lock ) + { + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + + executor.execute( new NamePreservingRunnable( worker ) ); + } + } + } + + public void unbind( SocketAddress address ) + { + if( address == null ) + { + throw new NullPointerException( "address" ); + } + + CancellationRequest request = new CancellationRequest( address ); + + try + { + startupWorker(); + } + catch( IOException e ) + { + // IOException is thrown only when Worker thread is not + // running and failed to open a selector. We simply throw + // IllegalArgumentException here because we can simply + // conclude that nothing is bound to the selector. + throw new IllegalArgumentException( "Address not bound: " + address ); + } + + synchronized( cancelQueue ) + { + cancelQueue.push( request ); + } + + selector.wakeup(); + + synchronized( request ) + { + while( !request.done ) + { + try + { + request.wait(); + } + catch( InterruptedException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + + if( request.exception != null ) + { + request.exception.fillInStackTrace(); + + throw request.exception; + } + } + + + private class Worker implements Runnable + { + public void run() + { + Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName ); + + for( ; ; ) + { + try + { + int nKeys = selector.select(); + + registerNew(); + + if( nKeys > 0 ) + { + processSessions( selector.selectedKeys() ); + } + + cancelKeys(); + + if( selector.keys().isEmpty() ) + { + synchronized( lock ) + { + if( selector.keys().isEmpty() && + registerQueue.isEmpty() && + cancelQueue.isEmpty() ) + { + worker = null; + try + { + selector.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + selector = null; + } + break; + } + } + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + + try + { + Thread.sleep( 1000 ); + } + catch( InterruptedException e1 ) + { + ExceptionMonitor.getInstance().exceptionCaught( e1 ); + } + } + } + } + + private void processSessions( Set keys ) throws IOException + { + Iterator it = keys.iterator(); + while( it.hasNext() ) + { + SelectionKey key = ( SelectionKey ) it.next(); + + it.remove(); + + if( !key.isAcceptable() ) + { + continue; + } + + ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); + + SocketChannel ch = ssc.accept(); + + if( ch == null ) + { + continue; + } + + boolean success = false; + try + { + + RegistrationRequest req = ( RegistrationRequest ) key.attachment(); + + MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl( + MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(), + req.config, ch, req.handler, req.address ); + + // New Interface +// SocketSessionImpl session = new SocketSessionImpl( +// SocketAcceptor.this, nextProcessor(), getListeners(), +// req.config, ch, req.handler, req.address ); + + + getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); + req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); + req.config.getThreadModel().buildFilterChain( session.getFilterChain() ); + session.getIoProcessor().addNew( session ); + success = true; + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + } + finally + { + if( !success ) + { + ch.close(); + } + } + } + } + } + + private MultiThreadSocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + + private void registerNew() + { + if( registerQueue.isEmpty() ) + { + return; + } + + for( ; ; ) + { + RegistrationRequest req; + + synchronized( registerQueue ) + { + req = ( RegistrationRequest ) registerQueue.pop(); + } + + if( req == null ) + { + break; + } + + ServerSocketChannel ssc = null; + + try + { + ssc = ServerSocketChannel.open(); + ssc.configureBlocking( false ); + + // Configure the server socket, + SocketAcceptorConfig cfg; + if( req.config instanceof SocketAcceptorConfig ) + { + cfg = ( SocketAcceptorConfig ) req.config; + } + else + { + cfg = ( SocketAcceptorConfig ) getDefaultConfig(); + } + + ssc.socket().setReuseAddress( cfg.isReuseAddress() ); + ssc.socket().setReceiveBufferSize( + ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() ); + + // and bind. + ssc.socket().bind( req.address, cfg.getBacklog() ); + if( req.address == null || req.address.getPort() == 0 ) + { + req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress(); + } + ssc.register( selector, SelectionKey.OP_ACCEPT, req ); + + synchronized( channels ) + { + channels.put( req.address, ssc ); + } + + getListeners().fireServiceActivated( + this, req.address, req.handler, req.config ); + } + catch( IOException e ) + { + req.exception = e; + } + finally + { + synchronized( req ) + { + req.done = true; + + req.notifyAll(); + } + + if( ssc != null && req.exception != null ) + { + try + { + ssc.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + } + } + } + } + + + private void cancelKeys() + { + if( cancelQueue.isEmpty() ) + { + return; + } + + for( ; ; ) + { + CancellationRequest request; + + synchronized( cancelQueue ) + { + request = ( CancellationRequest ) cancelQueue.pop(); + } + + if( request == null ) + { + break; + } + + ServerSocketChannel ssc; + synchronized( channels ) + { + ssc = ( ServerSocketChannel ) channels.remove( request.address ); + } + + // close the channel + try + { + if( ssc == null ) + { + request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); + } + else + { + SelectionKey key = ssc.keyFor( selector ); + request.registrationRequest = ( RegistrationRequest ) key.attachment(); + key.cancel(); + + selector.wakeup(); // wake up again to trigger thread death + + ssc.close(); + } + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught( e ); + } + finally + { + synchronized( request ) + { + request.done = true; + request.notifyAll(); + } + + if( request.exception == null ) + { + getListeners().fireServiceDeactivated( + this, request.address, + request.registrationRequest.handler, + request.registrationRequest.config ); + } + } + } + } + + private static class RegistrationRequest + { + private InetSocketAddress address; + private final IoHandler handler; + private final IoServiceConfig config; + private IOException exception; + private boolean done; + + private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + this.address = ( InetSocketAddress ) address; + this.handler = handler; + this.config = config; + } + } + + + private static class CancellationRequest + { + private final SocketAddress address; + private boolean done; + private RegistrationRequest registrationRequest; + private RuntimeException exception; + + private CancellationRequest( SocketAddress address ) + { + this.address = address; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java new file mode 100644 index 0000000000..7344f70078 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +/** + * {@link IoConnector} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class MultiThreadSocketConnector extends SocketConnector +{ + /** @noinspection StaticNonFinalField */ + private static volatile int nextId = 0; + + private final Object lock = new Object(); + private final int id = nextId++; + private final String threadName = "SocketConnector-" + id; + + private final Queue connectQueue = new Queue(); + private final MultiThreadSocketIoProcessor[] ioProcessors; + private final int processorCount; + private final Executor executor; + + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + private int workerTimeout = 60; // 1 min. + + /** Create a connector with a single processing thread using a NewThreadExecutor */ + public MultiThreadSocketConnector() + { + this(1, new NewThreadExecutor()); + } + + /** + * Create a connector with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public MultiThreadSocketConnector(int processorCount, Executor executor) + { + if (processorCount < 1) + { + throw new IllegalArgumentException("Must have at least one processor"); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; + + for (int i = 0; i < processorCount; i++) + { + ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); + } + } + + /** + * How many seconds to keep the connection thread alive between connection requests + * + * @return Number of seconds to keep connection thread alive + */ + public int getWorkerTimeout() + { + return workerTimeout; + } + + /** + * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. + * + * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 + */ + public void setWorkerTimeout(int workerTimeout) + { + if (workerTimeout < 0) + { + throw new IllegalArgumentException("Must be >= 0"); + } + this.workerTimeout = workerTimeout; + } + + public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + return connect(address, null, handler, config); + } + + public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoServiceConfig config) + { + if (address == null) + { + throw new NullPointerException("address"); + } + if (handler == null) + { + throw new NullPointerException("handler"); + } + + if (!(address instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected address type: " + + address.getClass()); + } + + if (localAddress != null && !(localAddress instanceof InetSocketAddress)) + { + throw new IllegalArgumentException("Unexpected local address type: " + + localAddress.getClass()); + } + + if (config == null) + { + config = getDefaultConfig(); + } + + SocketChannel ch = null; + boolean success = false; + try + { + ch = SocketChannel.open(); + ch.socket().setReuseAddress(true); + if (localAddress != null) + { + ch.socket().bind(localAddress); + } + + ch.configureBlocking(false); + + if (ch.connect(address)) + { + DefaultConnectFuture future = new DefaultConnectFuture(); + newSession(ch, handler, config, future); + success = true; + return future; + } + + success = true; + } + catch (IOException e) + { + return DefaultConnectFuture.newFailedFuture(e); + } + finally + { + if (!success && ch != null) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + + ConnectionRequest request = new ConnectionRequest(ch, handler, config); + synchronized (lock) + { + try + { + startupWorker(); + } + catch (IOException e) + { + try + { + ch.close(); + } + catch (IOException e2) + { + ExceptionMonitor.getInstance().exceptionCaught(e2); + } + + return DefaultConnectFuture.newFailedFuture(e); + } + } + + synchronized (connectQueue) + { + connectQueue.push(request); + } + selector.wakeup(); + + return request; + } + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + executor.execute(new NamePreservingRunnable(worker)); + } + } + + private void registerNew() + { + if (connectQueue.isEmpty()) + { + return; + } + + for (; ;) + { + ConnectionRequest req; + synchronized (connectQueue) + { + req = (ConnectionRequest) connectQueue.pop(); + } + + if (req == null) + { + break; + } + + SocketChannel ch = req.channel; + try + { + ch.register(selector, SelectionKey.OP_CONNECT, req); + } + catch (IOException e) + { + req.setException(e); + } + } + } + + private void processSessions(Set keys) + { + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isConnectable()) + { + continue; + } + + SocketChannel ch = (SocketChannel) key.channel(); + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + boolean success = false; + try + { + ch.finishConnect(); + newSession(ch, entry.handler, entry.config, entry); + success = true; + } + catch (Throwable e) + { + entry.setException(e); + } + finally + { + key.cancel(); + if (!success) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + keys.clear(); + } + + private void processTimedOutSessions(Set keys) + { + long currentTime = System.currentTimeMillis(); + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isValid()) + { + continue; + } + + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + if (currentTime >= entry.deadline) + { + entry.setException(new ConnectException()); + try + { + key.channel().close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + key.cancel(); + } + } + } + } + + private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + { + MultiThreadSocketSessionImpl session = + new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(), + config, ch, handler, ch.socket().getRemoteSocketAddress()); + + //new interface +// SocketSessionImpl session = new SocketSessionImpl( +// this, nextProcessor(), getListeners(), +// config, ch, handler, ch.socket().getRemoteSocketAddress() ); + try + { + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getThreadModel().buildFilterChain(session.getFilterChain()); + } + catch (Throwable e) + { + throw (IOException) new IOException("Failed to create a session.").initCause(e); + } + + // Set the ConnectFuture of the specified session, which will be + // removed and notified by AbstractIoFilterChain eventually. + session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); + + // Forward the remaining process to the SocketIoProcessor. + session.getIoProcessor().addNew(session); + } + + private MultiThreadSocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + private class Worker implements Runnable + { + private long lastActive = System.currentTimeMillis(); + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName); + + for (; ;) + { + try + { + int nKeys = selector.select(1000); + + registerNew(); + + if (nKeys > 0) + { + processSessions(selector.selectedKeys()); + } + + processTimedOutSessions(selector.keys()); + + if (selector.keys().isEmpty()) + { + if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) + { + synchronized (lock) + { + if (selector.keys().isEmpty() && + connectQueue.isEmpty()) + { + worker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + else + { + lastActive = System.currentTimeMillis(); + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + } + } + + private class ConnectionRequest extends DefaultConnectFuture + { + private final SocketChannel channel; + private final long deadline; + private final IoHandler handler; + private final IoServiceConfig config; + + private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) + { + this.channel = channel; + long timeout; + if (config instanceof IoConnectorConfig) + { + timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); + } + else + { + timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); + } + this.deadline = System.currentTimeMillis() + timeout; + this.handler = handler; + this.config = config; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java new file mode 100644 index 0000000000..67b8c8d820 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import java.io.IOException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.util.Queue; + +/** + * An {@link IoFilterChain} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + */ +class MultiThreadSocketFilterChain extends AbstractIoFilterChain { + + MultiThreadSocketFilterChain( IoSession parent ) + { + super( parent ); + } + + protected void doWrite( IoSession session, WriteRequest writeRequest ) + { + MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; + Queue writeRequestQueue = s.getWriteRequestQueue(); + + // SocketIoProcessor.doFlush() will reset it after write is finished + // because the buffer will be passed with messageSent event. + ( ( ByteBuffer ) writeRequest.getMessage() ).mark(); + synchronized( writeRequestQueue ) + { + writeRequestQueue.push( writeRequest ); + if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) + { + // Notify SocketIoProcessor only when writeRequestQueue was empty. + s.getIoProcessor().flush( s ); + } + } + } + + protected void doClose( IoSession session ) throws IOException + { + MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; + s.getIoProcessor().remove( s ); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java new file mode 100644 index 0000000000..c23ad8686f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java @@ -0,0 +1,1026 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.WriteTimeoutException; +import org.apache.mina.util.IdentityHashSet; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.Queue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $, + */ +class MultiThreadSocketIoProcessor extends SocketIoProcessor +{ + Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class); + Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader"); + Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer"); + + private static final long SELECTOR_TIMEOUT = 1000L; + + private int MAX_READ_BYTES_PER_SESSION = 524288; //512K + private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K + + private final Object readLock = new Object(); + private final Object writeLock = new Object(); + + private final String threadName; + private final Executor executor; + + private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); + + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ + private volatile Selector selector, writeSelector; + + private final Queue newSessions = new Queue(); + private final Queue removingSessions = new Queue(); + private final BlockingQueue flushingSessions = new LinkedBlockingQueue(); + private final IdentityHashSet flushingSessionsSet = new IdentityHashSet(); + + private final Queue trafficControllingSessions = new Queue(); + + private ReadWorker readWorker; + private WriteWorker writeWorker; + private long lastIdleReadCheckTime = System.currentTimeMillis(); + private long lastIdleWriteCheckTime = System.currentTimeMillis(); + + MultiThreadSocketIoProcessor(String threadName, Executor executor) + { + super(threadName, executor); + this.threadName = threadName; + this.executor = executor; + } + + void addNew(SocketSessionImpl session) throws IOException + { + synchronized (newSessions) + { + newSessions.push(session); + } + + startupWorker(); + + selector.wakeup(); + writeSelector.wakeup(); + } + + void remove(SocketSessionImpl session) throws IOException + { + scheduleRemove(session); + startupWorker(); + selector.wakeup(); + } + + private void startupWorker() throws IOException + { + synchronized (readLock) + { + if (readWorker == null) + { + selector = Selector.open(); + readWorker = new ReadWorker(); + executor.execute(new NamePreservingRunnable(readWorker)); + } + } + + synchronized (writeLock) + { + if (writeWorker == null) + { + writeSelector = Selector.open(); + writeWorker = new WriteWorker(); + executor.execute(new NamePreservingRunnable(writeWorker)); + } + } + + } + + void flush(SocketSessionImpl session) + { + scheduleFlush(session); + Selector selector = this.writeSelector; + + if (selector != null) + { + selector.wakeup(); + } + } + + void updateTrafficMask(SocketSessionImpl session) + { + scheduleTrafficControl(session); + Selector selector = this.selector; + if (selector != null) + { + selector.wakeup(); + } + } + + private void scheduleRemove(SocketSessionImpl session) + { + synchronized (removingSessions) + { + removingSessions.push(session); + } + } + + private void scheduleFlush(SocketSessionImpl session) + { + synchronized (flushingSessionsSet) + { + //if flushingSessions grows to contain Integer.MAX_VALUE sessions + // then this will fail. + if (flushingSessionsSet.add(session)) + { + flushingSessions.offer(session); + } + } + } + + private void scheduleTrafficControl(SocketSessionImpl session) + { + synchronized (trafficControllingSessions) + { + trafficControllingSessions.push(session); + } + } + + private void doAddNewReader() throws InterruptedException + { + if (newSessions.isEmpty()) + { + return; + } + + for (; ;) + { + MultiThreadSocketSessionImpl session; + + synchronized (newSessions) + { + session = (MultiThreadSocketSessionImpl) newSessions.peek(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + + + try + { + + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); + + //System.out.println("ReadDebug:"+"Awaiting Registration"); + session.awaitRegistration(); + sessionCreated(session); + } + catch (IOException e) + { + // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute + // and call ConnectFuture.setException(). + session.getFilterChain().fireExceptionCaught(session, e); + } + } + } + + + private void doAddNewWrite() throws InterruptedException + { + if (newSessions.isEmpty()) + { + return; + } + + for (; ;) + { + MultiThreadSocketSessionImpl session; + + synchronized (newSessions) + { + session = (MultiThreadSocketSessionImpl) newSessions.peek(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + + try + { + ch.configureBlocking(false); + synchronized (flushingSessionsSet) + { + flushingSessionsSet.add(session); + } + + session.setWriteSelectionKey(ch.register(writeSelector, + SelectionKey.OP_WRITE, + session)); + + //System.out.println("WriteDebug:"+"Awaiting Registration"); + session.awaitRegistration(); + sessionCreated(session); + } + catch (IOException e) + { + + // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute + // and call ConnectFuture.setException(). + session.getFilterChain().fireExceptionCaught(session, e); + } + } + } + + + private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException + { + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; + synchronized (newSessions) + { + if (!session.created()) + { + _logger.debug("Popping new session"); + newSessions.pop(); + + // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here + // in AbstractIoFilterChain.fireSessionOpened(). + session.getServiceListeners().fireSessionCreated(session); + + session.doneCreation(); + } + } + } + + private void doRemove() + { + if (removingSessions.isEmpty()) + { + return; + } + + for (; ;) + { + MultiThreadSocketSessionImpl session; + + synchronized (removingSessions) + { + session = (MultiThreadSocketSessionImpl) removingSessions.pop(); + } + + if (session == null) + { + break; + } + + SocketChannel ch = session.getChannel(); + SelectionKey key = session.getReadSelectionKey(); + SelectionKey writeKey = session.getWriteSelectionKey(); + + // Retry later if session is not yet fully initialized. + // (In case that Session.close() is called before addSession() is processed) + if (key == null || writeKey == null) + { + scheduleRemove(session); + break; + } + // skip if channel is already closed + if (!key.isValid() || !writeKey.isValid()) + { + continue; + } + + try + { + //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); + synchronized (readLock) + { + key.cancel(); + } + synchronized (writeLock) + { + writeKey.cancel(); + } + ch.close(); + } + catch (IOException e) + { + session.getFilterChain().fireExceptionCaught(session, e); + } + finally + { + releaseWriteBuffers(session); + session.getServiceListeners().fireSessionDestroyed(session); + } + } + } + + private void processRead(Set selectedKeys) + { + Iterator it = selectedKeys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); + + synchronized (readLock) + { + if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) + { + read(session); + } + } + + } + + selectedKeys.clear(); + } + + private void processWrite(Set selectedKeys) + { + Iterator it = selectedKeys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + + synchronized (writeLock) + { + if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) + { + + // Clear OP_WRITE + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + + synchronized (flushingSessionsSet) + { + flushingSessions.offer(session); + } + } + } + } + + selectedKeys.clear(); + } + + private void read(SocketSessionImpl session) + { + + //if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session)); + } + + int totalReadBytes = 0; + + while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) + { + ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); + SocketChannel ch = session.getChannel(); + + try + { + buf.clear(); + + int readBytes = 0; + int ret; + + try + { + while ((ret = ch.read(buf.buf())) > 0) + { + readBytes += ret; + totalReadBytes += ret; + } + } + finally + { + buf.flip(); + } + + + if (readBytes > 0) + { + session.increaseReadBytes(readBytes); + + session.getFilterChain().fireMessageReceived(session, buf); + buf = null; + } + + if (ret <= 0) + { + if (ret == 0) + { + if (readBytes == session.getReadBufferSize()) + { + continue; + } + } + else + { + scheduleRemove(session); + } + + break; + } + } + catch (Throwable e) + { + if (e instanceof IOException) + { + scheduleRemove(session); + } + session.getFilterChain().fireExceptionCaught(session, e); + + //Stop Reading this session. + return; + } + finally + { + if (buf != null) + { + buf.release(); + } + } + }//for + + // if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes); + } + } + + + private void notifyReadIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastIdleReadCheckTime) >= 1000) + { + lastIdleReadCheckTime = currentTime; + Set keys = selector.keys(); + if (keys != null) + { + for (Iterator it = keys.iterator(); it.hasNext();) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + notifyReadIdleness(session, currentTime); + } + } + } + } + + private void notifyWriteIdleness() + { + // process idle sessions + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastIdleWriteCheckTime) >= 1000) + { + lastIdleWriteCheckTime = currentTime; + Set keys = writeSelector.keys(); + if (keys != null) + { + for (Iterator it = keys.iterator(); it.hasNext();) + { + SelectionKey key = (SelectionKey) it.next(); + SocketSessionImpl session = (SocketSessionImpl) key.attachment(); + notifyWriteIdleness(session, currentTime); + } + } + } + } + + private void notifyReadIdleness(SocketSessionImpl session, long currentTime) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); + + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); + } + + private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) + { + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); + notifyIdleness0( + session, currentTime, + session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); + + notifyWriteTimeout(session, currentTime, session + .getWriteTimeoutInMillis(), session.getLastWriteTime()); + } + + private void notifyIdleness0(SocketSessionImpl session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) + { + if (idleTime > 0 && lastIoTime != 0 + && (currentTime - lastIoTime) >= idleTime) + { + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); + } + } + + private void notifyWriteTimeout(SocketSessionImpl session, + long currentTime, + long writeTimeout, long lastIoTime) + { + + MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; + SelectionKey key = sesh.getWriteSelectionKey(); + + synchronized (writeLock) + { + if (writeTimeout > 0 + && (currentTime - lastIoTime) >= writeTimeout + && key != null && key.isValid() + && (key.interestOps() & SelectionKey.OP_WRITE) != 0) + { + session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); + } + } + } + + private SocketSessionImpl getNextFlushingSession() + { + return (SocketSessionImpl) flushingSessions.poll(); + } + + private void releaseSession(SocketSessionImpl session) + { + synchronized (session.getWriteRequestQueue()) + { + synchronized (flushingSessionsSet) + { + if (session.getScheduledWriteRequests() > 0) + { + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session)); + } + flushingSessions.offer(session); + } + else + { + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session)); + } + flushingSessionsSet.remove(session); + } + } + } + } + + private void releaseWriteBuffers(SocketSessionImpl session) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + WriteRequest req; + + //Should this be synchronized? + synchronized (writeRequestQueue) + { + while ((req = (WriteRequest) writeRequestQueue.pop()) != null) + { + try + { + ((ByteBuffer) req.getMessage()).release(); + } + catch (IllegalStateException e) + { + session.getFilterChain().fireExceptionCaught(session, e); + } + finally + { + req.getFuture().setWritten(false); + } + } + } + } + + private void doFlush() + { + MultiThreadSocketSessionImpl session; + + while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) + { + if (!session.isConnected()) + { + releaseWriteBuffers(session); + releaseSession(session); + continue; + } + + SelectionKey key = session.getWriteSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.write() is called before addSession() is processed) + if (key == null) + { + scheduleFlush(session); + releaseSession(session); + continue; + } + // skip if channel is already closed + if (!key.isValid()) + { + releaseSession(session); + continue; + } + + try + { + if (doFlush(session)) + { + releaseSession(session); + } + } + catch (IOException e) + { + releaseSession(session); + scheduleRemove(session); + session.getFilterChain().fireExceptionCaught(session, e); + } + + } + + } + + private boolean doFlush(SocketSessionImpl sessionParam) throws IOException + { + MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; + // Clear OP_WRITE + SelectionKey key = session.getWriteSelectionKey(); + synchronized (writeLock) + { + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + } + SocketChannel ch = session.getChannel(); + Queue writeRequestQueue = session.getWriteRequestQueue(); + + long totalFlushedBytes = 0; + while (true) + { + WriteRequest req; + + synchronized (writeRequestQueue) + { + req = (WriteRequest) writeRequestQueue.first(); + } + + if (req == null) + { + break; + } + + ByteBuffer buf = (ByteBuffer) req.getMessage(); + if (buf.remaining() == 0) + { + synchronized (writeRequestQueue) + { + writeRequestQueue.pop(); + } + + session.increaseWrittenMessages(); + + buf.reset(); + session.getFilterChain().fireMessageSent(session, req); + continue; + } + + + int writtenBytes = 0; + + // Reported as DIRMINA-362 + //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. + if (key.isWritable()) + { + writtenBytes = ch.write(buf.buf()); + totalFlushedBytes += writtenBytes; + } + + if (writtenBytes > 0) + { + session.increaseWrittenBytes(writtenBytes); + } + + if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) + { + // Kernel buffer is full + synchronized (writeLock) + { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); + } + return false; + } + } + + if (_loggerWrite.isDebugEnabled()) + { + //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); + } + return true; + } + + private void doUpdateTrafficMask() + { + if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked()) + { + return; + } + + // Synchronize over entire operation as this method should be called + // from both read and write thread and we don't want the order of the + // updates to get changed. + trafficMaskUpdateLock.lock(); + try + { + for (; ;) + { + MultiThreadSocketSessionImpl session; + + session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop(); + + if (session == null) + { + break; + } + + SelectionKey key = session.getReadSelectionKey(); + // Retry later if session is not yet fully initialized. + // (In case that Session.suspend??() or session.resume??() is + // called before addSession() is processed) + if (key == null) + { + scheduleTrafficControl(session); + break; + } + // skip if channel is already closed + if (!key.isValid()) + { + continue; + } + + // The normal is OP_READ and, if there are write requests in the + // session's write queue, set OP_WRITE to trigger flushing. + + //Sset to Read and Write if there is nothing then the cost + // is one loop through the flusher. + int ops = SelectionKey.OP_READ; + + // Now mask the preferred ops with the mask of the current session + int mask = session.getTrafficMask().getInterestOps(); + synchronized (readLock) + { + key.interestOps(ops & mask); + } + //Change key to the WriteSelection Key + key = session.getWriteSelectionKey(); + if (key != null && key.isValid()) + { + Queue writeRequestQueue = session.getWriteRequestQueue(); + synchronized (writeRequestQueue) + { + if (!writeRequestQueue.isEmpty()) + { + ops = SelectionKey.OP_WRITE; + synchronized (writeLock) + { + key.interestOps(ops & mask); + } + } + } + } + } + } + finally + { + trafficMaskUpdateLock.unlock(); + } + + } + + private class WriteWorker implements Runnable + { + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer"); + + //System.out.println("WriteDebug:"+"Startup"); + for (; ;) + { + try + { + int nKeys = writeSelector.select(SELECTOR_TIMEOUT); + + doAddNewWrite(); + doUpdateTrafficMask(); + + if (nKeys > 0) + { + //System.out.println("WriteDebug:"+nKeys + " keys from writeselector"); + processWrite(writeSelector.selectedKeys()); + } + else + { + //System.out.println("WriteDebug:"+"No keys from writeselector"); + } + + doRemove(); + notifyWriteIdleness(); + + if (flushingSessionsSet.size() > 0) + { + doFlush(); + } + + if (writeSelector.keys().isEmpty()) + { + synchronized (writeLock) + { + + if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) + { + writeWorker = null; + try + { + writeSelector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + writeSelector = null; + } + + break; + } + } + } + + } + catch (Throwable t) + { + ExceptionMonitor.getInstance().exceptionCaught(t); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + //System.out.println("WriteDebug:"+"Shutdown"); + } + + } + + private class ReadWorker implements Runnable + { + + public void run() + { + Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); + + //System.out.println("ReadDebug:"+"Startup"); + for (; ;) + { + try + { + int nKeys = selector.select(SELECTOR_TIMEOUT); + + doAddNewReader(); + doUpdateTrafficMask(); + + if (nKeys > 0) + { + //System.out.println("ReadDebug:"+nKeys + " keys from selector"); + + processRead(selector.selectedKeys()); + } + else + { + //System.out.println("ReadDebug:"+"No keys from selector"); + } + + + doRemove(); + notifyReadIdleness(); + + if (selector.keys().isEmpty()) + { + + synchronized (readLock) + { + if (selector.keys().isEmpty() && newSessions.isEmpty()) + { + readWorker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + + break; + } + } + } + } + catch (Throwable t) + { + ExceptionMonitor.getInstance().exceptionCaught(t); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + //System.out.println("ReadDebug:"+"Shutdown"); + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java new file mode 100644 index 0000000000..043d4800b6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; + +/** + * An {@link IoConnectorConfig} for {@link SocketConnector}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl +{ + private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; + private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false; + private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false; + private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false; + + private static boolean DEFAULT_REUSE_ADDRESS; + private static int DEFAULT_RECEIVE_BUFFER_SIZE; + private static int DEFAULT_SEND_BUFFER_SIZE; + private static int DEFAULT_TRAFFIC_CLASS; + private static boolean DEFAULT_KEEP_ALIVE; + private static boolean DEFAULT_OOB_INLINE; + private static int DEFAULT_SO_LINGER; + private static boolean DEFAULT_TCP_NO_DELAY; + + static + { + initialize(); + } + + private static void initialize() + { + Socket socket = null; + + socket = new Socket(); + + try + { + DEFAULT_REUSE_ADDRESS = socket.getReuseAddress(); + DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize(); + DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize(); + DEFAULT_KEEP_ALIVE = socket.getKeepAlive(); + DEFAULT_OOB_INLINE = socket.getOOBInline(); + DEFAULT_SO_LINGER = socket.getSoLinger(); + DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay(); + + // Check if setReceiveBufferSize is supported. + try + { + socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE); + SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true; + } + catch( SocketException e ) + { + SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; + } + + // Check if setSendBufferSize is supported. + try + { + socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE); + SET_SEND_BUFFER_SIZE_AVAILABLE = true; + } + catch( SocketException e ) + { + SET_SEND_BUFFER_SIZE_AVAILABLE = false; + } + + // Check if getTrafficClass is supported. + try + { + DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass(); + GET_TRAFFIC_CLASS_AVAILABLE = true; + } + catch( SocketException e ) + { + GET_TRAFFIC_CLASS_AVAILABLE = false; + DEFAULT_TRAFFIC_CLASS = 0; + } + } + catch( SocketException e ) + { + throw new ExceptionInInitializerError(e); + } + finally + { + if( socket != null ) + { + try + { + socket.close(); + } + catch( IOException e ) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + public static boolean isSetReceiveBufferSizeAvailable() { + return SET_RECEIVE_BUFFER_SIZE_AVAILABLE; + } + + public static boolean isSetSendBufferSizeAvailable() { + return SET_SEND_BUFFER_SIZE_AVAILABLE; + } + + public static boolean isGetTrafficClassAvailable() { + return GET_TRAFFIC_CLASS_AVAILABLE; + } + + public static boolean isSetTrafficClassAvailable() { + return SET_TRAFFIC_CLASS_AVAILABLE; + } + + private boolean reuseAddress = DEFAULT_REUSE_ADDRESS; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private int trafficClass = DEFAULT_TRAFFIC_CLASS; + private boolean keepAlive = DEFAULT_KEEP_ALIVE; + private boolean oobInline = DEFAULT_OOB_INLINE; + private int soLinger = DEFAULT_SO_LINGER; + private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; + + /** + * Creates a new instance. + */ + MultiThreadSocketSessionConfigImpl() + { + } + + public boolean isReuseAddress() + { + return reuseAddress; + } + + public void setReuseAddress( boolean reuseAddress ) + { + this.reuseAddress = reuseAddress; + } + + public int getReceiveBufferSize() + { + return receiveBufferSize; + } + + public void setReceiveBufferSize( int receiveBufferSize ) + { + this.receiveBufferSize = receiveBufferSize; + } + + public int getSendBufferSize() + { + return sendBufferSize; + } + + public void setSendBufferSize( int sendBufferSize ) + { + this.sendBufferSize = sendBufferSize; + } + + public int getTrafficClass() + { + return trafficClass; + } + + public void setTrafficClass( int trafficClass ) + { + this.trafficClass = trafficClass; + } + + public boolean isKeepAlive() + { + return keepAlive; + } + + public void setKeepAlive( boolean keepAlive ) + { + this.keepAlive = keepAlive; + } + + public boolean isOobInline() + { + return oobInline; + } + + public void setOobInline( boolean oobInline ) + { + this.oobInline = oobInline; + } + + public int getSoLinger() + { + return soLinger; + } + + public void setSoLinger( int soLinger ) + { + this.soLinger = soLinger; + } + + public boolean isTcpNoDelay() + { + return tcpNoDelay; + } + + public void setTcpNoDelay( boolean tcpNoDelay ) + { + this.tcpNoDelay = tcpNoDelay; + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java new file mode 100644 index 0000000000..be4a2d289d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoService; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.RuntimeIOException; +import org.apache.mina.common.TransportType; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.IoServiceListenerSupport; +import org.apache.mina.util.Queue; + +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An {@link IoSession} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +class MultiThreadSocketSessionImpl extends SocketSessionImpl +{ + private final IoService manager; + private final IoServiceConfig serviceConfig; + private final SocketSessionConfig config = new SessionConfigImpl(); + private final MultiThreadSocketIoProcessor ioProcessor; + private final MultiThreadSocketFilterChain filterChain; + private final SocketChannel ch; + private final Queue writeRequestQueue; + private final IoHandler handler; + private final SocketAddress remoteAddress; + private final SocketAddress localAddress; + private final SocketAddress serviceAddress; + private final IoServiceListenerSupport serviceListeners; + private SelectionKey readKey, writeKey; + private int readBufferSize; + private CountDownLatch registeredReadyLatch = new CountDownLatch(2); + private AtomicBoolean created = new AtomicBoolean(false); + + /** + * Creates a new instance. + */ + MultiThreadSocketSessionImpl( IoService manager, + SocketIoProcessor ioProcessor, + IoServiceListenerSupport listeners, + IoServiceConfig serviceConfig, + SocketChannel ch, + IoHandler defaultHandler, + SocketAddress serviceAddress ) + { + super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress); + this.manager = manager; + this.serviceListeners = listeners; + this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor; + this.filterChain = new MultiThreadSocketFilterChain(this); + this.ch = ch; + this.writeRequestQueue = new Queue(); + this.handler = defaultHandler; + this.remoteAddress = ch.socket().getRemoteSocketAddress(); + this.localAddress = ch.socket().getLocalSocketAddress(); + this.serviceAddress = serviceAddress; + this.serviceConfig = serviceConfig; + + // Apply the initial session settings + IoSessionConfig sessionConfig = serviceConfig.getSessionConfig(); + if( sessionConfig instanceof SocketSessionConfig ) + { + SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig; + this.config.setKeepAlive( cfg.isKeepAlive() ); + this.config.setOobInline( cfg.isOobInline() ); + this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() ); + this.readBufferSize = cfg.getReceiveBufferSize(); + this.config.setReuseAddress( cfg.isReuseAddress() ); + this.config.setSendBufferSize( cfg.getSendBufferSize() ); + this.config.setSoLinger( cfg.getSoLinger() ); + this.config.setTcpNoDelay( cfg.isTcpNoDelay() ); + + if( this.config.getTrafficClass() != cfg.getTrafficClass() ) + { + this.config.setTrafficClass( cfg.getTrafficClass() ); + } + } + } + + void awaitRegistration() throws InterruptedException + { + registeredReadyLatch.countDown(); + + registeredReadyLatch.await(); + } + + boolean created() throws InterruptedException + { + return created.get(); + } + + void doneCreation() + { + created.getAndSet(true); + } + + public IoService getService() + { + return manager; + } + + public IoServiceConfig getServiceConfig() + { + return serviceConfig; + } + + public IoSessionConfig getConfig() + { + return config; + } + + SocketIoProcessor getIoProcessor() + { + return ioProcessor; + } + + public IoFilterChain getFilterChain() + { + return filterChain; + } + + SocketChannel getChannel() + { + return ch; + } + + IoServiceListenerSupport getServiceListeners() + { + return serviceListeners; + } + + SelectionKey getSelectionKey() + { + return readKey; + } + + SelectionKey getReadSelectionKey() + { + return readKey; + } + + SelectionKey getWriteSelectionKey() + { + return writeKey; + } + + void setSelectionKey(SelectionKey key) + { + this.readKey = key; + } + + void setWriteSelectionKey(SelectionKey key) + { + this.writeKey = key; + } + + public IoHandler getHandler() + { + return handler; + } + + protected void close0() + { + filterChain.fireFilterClose( this ); + } + + Queue getWriteRequestQueue() + { + return writeRequestQueue; + } + + /** + @return int Number of write scheduled write requests + @deprecated + */ + public int getScheduledWriteMessages() + { + return getScheduledWriteRequests(); + } + + public int getScheduledWriteRequests() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.size(); + } + } + + public int getScheduledWriteBytes() + { + synchronized( writeRequestQueue ) + { + return writeRequestQueue.byteSize(); + } + } + + protected void write0( WriteRequest writeRequest ) + { + filterChain.fireFilterWrite( this, writeRequest ); + } + + public TransportType getTransportType() + { + return TransportType.SOCKET; + } + + public SocketAddress getRemoteAddress() + { + //This is what I had previously +// return ch.socket().getRemoteSocketAddress(); + return remoteAddress; + } + + public SocketAddress getLocalAddress() + { + //This is what I had previously +// return ch.socket().getLocalSocketAddress(); + return localAddress; + } + + public SocketAddress getServiceAddress() + { + return serviceAddress; + } + + protected void updateTrafficMask() + { + this.ioProcessor.updateTrafficMask( this ); + } + + int getReadBufferSize() + { + return readBufferSize; + } + + private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig + { + public boolean isKeepAlive() + { + try + { + return ch.socket().getKeepAlive(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setKeepAlive( boolean on ) + { + try + { + ch.socket().setKeepAlive( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isOobInline() + { + try + { + return ch.socket().getOOBInline(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setOobInline( boolean on ) + { + try + { + ch.socket().setOOBInline( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isReuseAddress() + { + try + { + return ch.socket().getReuseAddress(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReuseAddress( boolean on ) + { + try + { + ch.socket().setReuseAddress( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getSoLinger() + { + try + { + return ch.socket().getSoLinger(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSoLinger( int linger ) + { + try + { + if( linger < 0 ) + { + ch.socket().setSoLinger( false, 0 ); + } + else + { + ch.socket().setSoLinger( true, linger ); + } + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public boolean isTcpNoDelay() + { + try + { + return ch.socket().getTcpNoDelay(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setTcpNoDelay( boolean on ) + { + try + { + ch.socket().setTcpNoDelay( on ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public int getTrafficClass() + { + if( SocketSessionConfigImpl.isGetTrafficClassAvailable() ) + { + try + { + return ch.socket().getTrafficClass(); + } + catch( SocketException e ) + { + // Throw an exception only when setTrafficClass is also available. + if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) + { + throw new RuntimeIOException( e ); + } + } + } + + return 0; + } + + public void setTrafficClass( int tc ) + { + if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) + { + try + { + ch.socket().setTrafficClass( tc ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + + public int getSendBufferSize() + { + try + { + return ch.socket().getSendBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setSendBufferSize( int size ) + { + if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() ) + { + try + { + ch.socket().setSendBufferSize( size ); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + + public int getReceiveBufferSize() + { + try + { + return ch.socket().getReceiveBufferSize(); + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + + public void setReceiveBufferSize( int size ) + { + if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() ) + { + try + { + ch.socket().setReceiveBufferSize( size ); + MultiThreadSocketSessionImpl.this.readBufferSize = size; + } + catch( SocketException e ) + { + throw new RuntimeIOException( e ); + } + } + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java new file mode 100644 index 0000000000..a23e546af5 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.vmpipe; + +import java.io.IOException; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.support.AbstractIoFilterChain; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.BaseIoConnectorConfig; +import org.apache.mina.common.support.BaseIoSessionConfig; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.vmpipe.support.VmPipe; +import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker; +import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl; +import org.apache.mina.util.AnonymousSocketAddress; + +/** + * Connects to {@link IoHandler}s which is bound on the specified + * {@link VmPipeAddress}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ + */ +public class QpidVmPipeConnector extends VmPipeConnector +{ + private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {}; + private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig() + { + public IoSessionConfig getSessionConfig() + { + return CONFIG; + } + }; + + /** + * Creates a new instance. + */ + public QpidVmPipeConnector() + { + } + + public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config ) + { + return connect( address, null, handler, config ); + } + + public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config ) + { + if( address == null ) + throw new NullPointerException( "address" ); + if( handler == null ) + throw new NullPointerException( "handler" ); + if( ! ( address instanceof VmPipeAddress ) ) + throw new IllegalArgumentException( + "address must be VmPipeAddress." ); + + if( config == null ) + { + config = getDefaultConfig(); + } + + VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address ); + if( entry == null ) + { + return DefaultConnectFuture.newFailedFuture( + new IOException( "Endpoint unavailable: " + address ) ); + } + + DefaultConnectFuture future = new DefaultConnectFuture(); + VmPipeSessionImpl localSession = + new VmPipeSessionImpl( + this, + config, + getListeners(), + new Object(), // lock + new AnonymousSocketAddress(), + handler, + entry ); + + // initialize acceptor session + VmPipeSessionImpl remoteSession = localSession.getRemoteSession(); + try + { + IoFilterChain filterChain = remoteSession.getFilterChain(); + entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain ); + entry.getConfig().getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + entry.getListeners().fireSessionCreated( remoteSession ); + VmPipeIdleStatusChecker.getInstance().addSession( remoteSession ); + } + catch( Throwable t ) + { + ExceptionMonitor.getInstance().exceptionCaught( t ); + remoteSession.close(); + } + + + // initialize connector session + try + { + IoFilterChain filterChain = localSession.getFilterChain(); + this.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getFilterChainBuilder().buildFilterChain( filterChain ); + config.getThreadModel().buildFilterChain( filterChain ); + + // The following sentences don't throw any exceptions. + localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future ); + getListeners().fireSessionCreated( localSession ); + VmPipeIdleStatusChecker.getInstance().addSession( localSession); + } + catch( Throwable t ) + { + future.setException( t ); + } + + + + return future; + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 2f6290b55a..ef9420ba87 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -54,7 +54,6 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); - return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), getMessageAsShortString(),_classId,_methodId)); + return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId)); } - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index ca9c9f9dc4..8ef6facef1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -62,10 +62,9 @@ public class AMQConnectionException extends AMQException MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor)); return new AMQFrame(0, reg.createConnectionCloseBody(getErrorCode().getCode(), - getMessageAsShortString(), + new AMQShortString(getMessage()), _classId, _methodId)); } - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java index 86d439d269..b0c6fccc9e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -20,7 +20,6 @@ */ package org.apache.qpid; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQConstant; /** @@ -122,19 +121,4 @@ public class AMQException extends Exception return newAMQE; } - - /** - * Truncates the exception message to 255 characters if its length exceeds 255. - * - * @return exception message - */ - public AMQShortString getMessageAsShortString() - { - String message = getMessage(); - if (message != null && message.length() > AMQShortString.MAX_LENGTH) - { - message = message.substring(0, AMQShortString.MAX_LENGTH - 3) + "..."; - } - return new AMQShortString(message); - } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java new file mode 100644 index 0000000000..5423bbb68f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.mina.MinaHandler; + +import static org.apache.qpid.transport.util.Functions.str; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * ToyBroker + * + * @author Rafael H. Schloming + */ + +class ToyBroker extends SessionDelegate +{ + + private ToyExchange exchange; + private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); + + public ToyBroker(ToyExchange exchange) + { + this.exchange = exchange; + } + + public void messageAcquire(Session context, MessageAcquire struct) + { + System.out.println("\n==================> messageAcquire " ); + context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); + } + + @Override public void queueDeclare(Session ssn, QueueDeclare qd) + { + exchange.createQueue(qd.getQueue()); + System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); + } + + @Override public void exchangeBind(Session ssn, ExchangeBind qb) + { + exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); + } + + @Override public void queueQuery(Session ssn, QueueQuery qq) + { + QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); + ssn.executionResult((int) qq.getId(), result); + } + + @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) + { + Consumer c = new Consumer(); + c._queueName = ms.getQueue(); + consumers.put(ms.getDestination(),c); + System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); + } + + @Override public void messageFlow(Session ssn,MessageFlow struct) + { + Consumer c = consumers.get(struct.getDestination()); + c._credit = struct.getValue(); + System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); + } + + @Override public void messageFlush(Session ssn,MessageFlush struct) + { + System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); + checkAndSendMessagesToConsumer(ssn,struct.getDestination()); + } + + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) + { + String dest = xfr.getDestination(); + System.out.println("received transfer " + dest); + Header header = xfr.getHeader(); + DeliveryProperties props = header.get(DeliveryProperties.class); + if (props != null) + { + System.out.println("received headers routing_key " + props.getRoutingKey()); + } + + MessageProperties mp = header.get(MessageProperties.class); + System.out.println("MP: " + mp); + if (mp != null) + { + System.out.println(mp.getApplicationHeaders()); + } + + if (exchange.route(dest,props == null ? null : props.getRoutingKey(),xfr)) + { + System.out.println("queued " + xfr); + dispatchMessages(ssn); + } + else + { + + if (props == null || !props.getDiscardUnroutable()) + { + RangeSet ranges = new RangeSet(); + ranges.add(xfr.getId()); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); + } + } + ssn.processed(xfr); + } + + private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) + { + System.out.println("\n==================> Transfering message to: " +dest + "\n"); + ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + m.getHeader(), m.getBody()); + } + + private void dispatchMessages(Session ssn) + { + for (String dest: consumers.keySet()) + { + checkAndSendMessagesToConsumer(ssn,dest); + } + } + + private void checkAndSendMessagesToConsumer(Session ssn,String dest) + { + Consumer c = consumers.get(dest); + LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName); + MessageTransfer m = queue.poll(); + while (m != null && c._credit>0) + { + transferMessageToPeer(ssn,dest,m); + c._credit--; + m = queue.poll(); + } + } + + // ugly, but who cares :) + // assumes unit is always no of messages, not bytes + // assumes it's credit mode and not window + private static class Consumer + { + long _credit; + String _queueName; + } + + private static final class ToyBrokerSession extends Session + { + + public ToyBrokerSession(Connection connection, Binary name, long expiry, ToyExchange exchange) + { + super(connection, new ToyBroker(exchange), name, expiry); + } + } + + public static final void main(String[] args) throws IOException + { + final ToyExchange exchange = new ToyExchange(); + ConnectionDelegate delegate = new ServerDelegate() + { + @Override + public void init(Connection conn, ProtocolHeader hdr) + { + conn.setSessionFactory(new Connection.SessionFactory() + { + public Session newSession(Connection conn, Binary name, long expiry) + { + return new ToyBrokerSession(conn, name, expiry, exchange); + } + }); + + super.init(conn, hdr); //To change body of overridden methods use File | Settings | File Templates. + } + + }; + + MinaHandler.accept("0.0.0.0", 5672, delegate); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java new file mode 100644 index 0000000000..5b2db10613 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid; + +import java.nio.*; +import java.util.*; + +import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.mina.MinaHandler; + + +/** + * ToyClient + * + * @author Rafael H. Schloming + */ + +class ToyClient implements SessionListener +{ + public void opened(Session ssn) {} + + public void resumed(Session ssn) {} + + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void message(Session ssn, MessageTransfer xfr) + { + System.out.println("msg: " + xfr); + } + + public void closed(Session ssn) {} + + public static final void main(String[] args) + { + Connection conn = new Connection(); + conn.connect("0.0.0.0", 5672, null, "guest", "guest", false); + Session ssn = conn.createSession(); + ssn.setSessionListener(new ToyClient()); + + ssn.queueDeclare("asdf", null, null); + ssn.sync(); + + Map<String,Object> nested = new LinkedHashMap<String,Object>(); + nested.put("list", Arrays.asList("one", "two", "three")); + Map<String,Object> map = new LinkedHashMap<String,Object>(); + + map.put("str", "this is a string"); + + map.put("+int", 3); + map.put("-int", -3); + map.put("maxint", Integer.MAX_VALUE); + map.put("minint", Integer.MIN_VALUE); + + map.put("+short", (short) 1); + map.put("-short", (short) -1); + map.put("maxshort", (short) Short.MAX_VALUE); + map.put("minshort", (short) Short.MIN_VALUE); + + map.put("float", (float) 3.3); + map.put("double", 4.9); + map.put("char", 'c'); + + map.put("table", nested); + map.put("list", Arrays.asList(1, 2, 3)); + map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + + ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties(), + new MessageProperties() + .setApplicationHeaders(map)), + "this is the data"); + + ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + null, + "this should be rejected"); + ssn.sync(); + + Future<QueueQueryResult> future = ssn.queueQuery("asdf"); + System.out.println(future.get().getQueue()); + ssn.sync(); + ssn.close(); + conn.close(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java new file mode 100644 index 0000000000..da6aed9629 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java @@ -0,0 +1,154 @@ +package org.apache.qpid; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.qpid.transport.MessageTransfer; + + +public class ToyExchange +{ + final static String DIRECT = "amq.direct"; + final static String TOPIC = "amq.topic"; + + private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); + private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); + private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>(); + + public void createQueue(String name) + { + queues.put(name, new LinkedBlockingQueue<MessageTransfer>()); + } + + public LinkedBlockingQueue<MessageTransfer> getQueue(String name) + { + return queues.get(name); + } + + public void bindQueue(String type,String binding,String queueName) + { + LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName); + binding = normalizeKey(binding); + if(DIRECT.equals(type)) + { + + if (directEx.containsKey(binding)) + { + List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding); + list.add(queue); + } + else + { + List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); + list.add(queue); + directEx.put(binding,list); + } + } + else + { + if (topicEx.containsKey(binding)) + { + List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding); + list.add(queue); + } + else + { + List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); + list.add(queue); + topicEx.put(binding,list); + } + } + } + + public boolean route(String dest, String routingKey, MessageTransfer msg) + { + List<LinkedBlockingQueue<MessageTransfer>> queues; + if(DIRECT.equals(dest)) + { + queues = directEx.get(routingKey); + } + else + { + queues = matchWildCard(routingKey); + } + if(queues != null && queues.size()>0) + { + System.out.println("Message stored in " + queues.size() + " queues"); + storeMessage(msg,queues); + return true; + } + else + { + System.out.println("Message unroutable " + msg); + return false; + } + } + + private String normalizeKey(String routingKey) + { + if(routingKey.indexOf(".*")>1) + { + return routingKey.substring(0,routingKey.indexOf(".*")); + } + else + { + return routingKey; + } + } + + private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey) + { + List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>(); + + for(String key: topicEx.keySet()) + { + Pattern p = Pattern.compile(key); + Matcher m = p.matcher(routingKey); + if (m.find()) + { + for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key)) + { + selected.add(queue); + } + } + } + + return selected; + } + + private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected) + { + for(LinkedBlockingQueue<MessageTransfer> queue : selected) + { + queue.offer(msg); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 36f9a1ae2d..0dd21238a7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -23,7 +23,7 @@ package org.apache.qpid.configuration; */ public class ClientProperties { - + /** * Currently with Qpid it is not possible to change the client ID. * If one is not specified upon connection construction, an id is generated automatically. @@ -68,40 +68,67 @@ public class ClientProperties * by the broker in TuneOK it will be used as the heartbeat interval. * If not a warning will be printed and the max value specified for * heartbeat in TuneOK will be used - * + * * The default idle timeout is set to 120 secs */ public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; public static final long DEFAULT_IDLE_TIMEOUT = 120000; - + public static final String HEARTBEAT = "qpid.heartbeat"; public static final int HEARTBEAT_DEFAULT = 120; - + /** * This value will be used to determine the default destination syntax type. * Currently the two types are Binding URL (java only) and the Addressing format (used by - * all clients). + * all clients). */ public static final String DEST_SYNTAX = "qpid.dest_syntax"; - + public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message"; - public static final String AMQP_VERSION = "qpid.amqp.version"; + /** + * ========================================================== + * Those properties are used when the io size should be bounded + * ========================================================== + */ - public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id"; + /** + * When set to true the io layer throttle down producers and consumers + * when written or read messages are accumulating and exceeding a certain size. + * This is especially useful when a the producer rate is greater than the network + * speed. + * type: boolean + */ + public static final String PROTECTIO_PROP_NAME = "protectio"; - private static ClientProperties _instance = new ClientProperties(); + //=== The following properties are only used when the previous one is true. + /** + * Max size of read messages that can be stored within the MINA layer + * type: int + */ + public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; + public static final String READ_BUFFER_LIMIT_DEFAULT = "262144"; + /** + * Max size of written messages that can be stored within the MINA layer + * type: int + */ + public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; + public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144"; + public static final String AMQP_VERSION = "qpid.amqp.version"; + + private static ClientProperties _instance = new ClientProperties(); + /* - public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = + public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID"); - + public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME = QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence"); - - + + public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME = QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */ - - + + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 2b9e2ffaba..39a9beb9e8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -37,10 +37,6 @@ import java.lang.ref.WeakReference; */ public final class AMQShortString implements CharSequence, Comparable<AMQShortString> { - /** - * The maximum number of octets in AMQ short string as defined in AMQP specification - */ - public static final int MAX_LENGTH = 255; private static final byte MINUS = (byte)'-'; private static final byte ZERO = (byte) '0'; @@ -122,19 +118,22 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public AMQShortString(byte[] data) { - if (data == null) - { - throw new NullPointerException("Cannot create AMQShortString with null data[]"); - } - if (data.length > MAX_LENGTH) - { - throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); - } + _data = data.clone(); _length = data.length; _offset = 0; } + public AMQShortString(byte[] data, int pos) + { + final int size = data[pos++]; + final byte[] dataCopy = new byte[size]; + System.arraycopy(data,pos,dataCopy,0,size); + _length = size; + _data = dataCopy; + _offset = 0; + } + public AMQShortString(String data) { this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray()); @@ -147,12 +146,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { throw new NullPointerException("Cannot create AMQShortString with null char[]"); } - // the current implementation of 0.8/0.9.x short string encoding - // supports only ASCII characters - if (data.length> MAX_LENGTH) - { - throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); - } + final int length = data.length; final byte[] stringBytes = new byte[length]; int hash = 0; @@ -171,17 +165,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt public AMQShortString(CharSequence charSequence) { - if (charSequence == null) - { - // it should be possible to create short string for null data - charSequence = ""; - } - // the current implementation of 0.8/0.9.x short string encoding - // supports only ASCII characters - if (charSequence.length() > MAX_LENGTH) - { - throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); - } final int length = charSequence.length(); final byte[] stringBytes = new byte[length]; int hash = 0; @@ -201,10 +184,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private AMQShortString(ByteBuffer data, final int length) { - if (length > MAX_LENGTH) - { - throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); - } if(data.isDirect() || data.isReadOnly()) { byte[] dataBytes = new byte[length]; @@ -226,17 +205,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt private AMQShortString(final byte[] data, final int from, final int to) { - if (data == null) - { - throw new NullPointerException("Cannot create AMQShortString with null data[]"); - } - int length = to - from; - if (length > MAX_LENGTH) - { - throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); - } _offset = from; - _length = length; + _length = to - from; _data = data; } @@ -275,6 +245,29 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return new CharSubSequence(start, end); } + public int writeToByteArray(byte[] encoding, int pos) + { + final int size = length(); + encoding[pos++] = (byte) size; + System.arraycopy(_data,_offset,encoding,pos,size); + return pos+size; + } + + public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos) + { + + + final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos); + if(shortString.length() == 0) + { + return null; + } + else + { + return shortString; + } + } + public static AMQShortString readFromBuffer(ByteBuffer buffer) { final short length = buffer.getUnsigned(); @@ -697,10 +690,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt size += term.length(); } - if (size > MAX_LENGTH) - { - throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); - } byte[] data = new byte[size]; int pos = 0; final byte[] delimData = delim._data; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 30db3b8be7..83e5a7e341 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -36,7 +36,7 @@ public class ContentHeaderBody implements AMQBody public long bodySize; /** must never be null */ - private ContentHeaderProperties properties; + public ContentHeaderProperties properties; public ContentHeaderBody() { @@ -128,14 +128,4 @@ public class ContentHeaderBody implements AMQBody { return new AMQFrame(channelId, body); } - - public ContentHeaderProperties getProperties() - { - return properties; - } - - public void setProperties(ContentHeaderProperties props) - { - properties = props; - } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 48a3df734a..31953ea6ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -22,6 +22,8 @@ package org.apache.qpid.protocol; import java.net.SocketAddress; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Receiver; /** @@ -30,6 +32,9 @@ import org.apache.qpid.transport.Receiver; */ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> { + // Sets the network driver providing data for this ProtocolEngine + void setNetworkDriver (NetworkDriver driver); + // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index 4e40b78440..9df84eef90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.NetworkDriver; public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(NetworkConnection network); + ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java index 30010a2d89..38f60c04fe 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java @@ -23,7 +23,7 @@ package org.apache.qpid.thread; import org.apache.qpid.thread.Threading; -import java.util.concurrent.Executor; +import edu.emory.mathcs.backport.java.util.concurrent.Executor; public class QpidThreadExecutor implements Executor { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index c8b7ad2a5e..0d9f8c0b28 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -247,7 +247,7 @@ public class ClientDelegate extends ConnectionDelegate int i = heartbeat; if (i == 0) { - log.info("Idle timeout is 0 sec. Heartbeats are disabled."); + log.warn("Idle timeout is zero. Heartbeats are disabled"); return 0; // heartbeats are disabled. } else if (i >= min && i <= max) @@ -256,8 +256,8 @@ public class ClientDelegate extends ConnectionDelegate } else { - log.info("The broker does not support the configured connection idle timeout of %s sec," + - " using the brokers max supported value of %s sec instead.", i,max); + log.warn("Ignoring the idle timeout %s set by the connection," + + " using the brokers max value %s", i,max); return max; } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 7d17397b2d..e5e10c0e07 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -25,10 +25,9 @@ import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; +import static org.apache.qpid.transport.Connection.State.RESUMING; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -40,13 +39,6 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; @@ -120,6 +112,7 @@ public class Connection extends ConnectionInvoker private SaslServer saslServer; private SaslClient saslClient; private int idleTimeout = 0; + private String _authorizationID; private Map<String,Object> _serverProperties; private String userID; private ConnectionSettings conSettings; @@ -241,14 +234,13 @@ public class Connection extends ConnectionInvoker state = OPENING; userID = settings.getUsername(); delegate = new ClientDelegate(settings); - - securityLayer = new SecurityLayer(this); - - OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); - Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this))); - NetworkConnection network = transport.connect(settings, receiver, null); - sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize()); - + + TransportBuilder transport = new TransportBuilder(); + transport.init(this); + this.sender = transport.buildSenderPipe(); + transport.buildReceiverPipe(this); + this.securityLayer = transport.getSecurityLayer(); + send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -474,12 +466,11 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - List <Binary> transactedSessions = new ArrayList(); for (Session ssn : sessions.values()) { if (ssn.isTransacted()) - { - transactedSessions.add(ssn.getName()); + { + removeSession(ssn); ssn.setState(Session.State.CLOSED); } else @@ -489,11 +480,6 @@ public class Connection extends ConnectionInvoker ssn.resume(); } } - - for (Binary ssn_name : transactedSessions) - { - sessions.remove(ssn_name); - } setState(OPEN); } } @@ -659,6 +645,16 @@ public class Connection extends ConnectionInvoker return idleTimeout; } + public void setAuthorizationID(String authorizationID) + { + _authorizationID = authorizationID; + } + + public String getAuthorizationID() + { + return _authorizationID; + } + public String getUserID() { return userID; @@ -699,8 +695,4 @@ public class Connection extends ConnectionInvoker return connectionLost.get(); } - protected Collection<Session> getChannels() - { - return channels.values(); - } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index f183c1e241..88dd2d6afa 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -95,7 +95,6 @@ public abstract class ConnectionDelegate Session ssn = conn.getSession(dtc.getChannel()); if (ssn != null) { - ssn.setDetachCode(dtc.getCode()); conn.unmap(ssn); ssn.closed(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 2074c77a5b..08678b213b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -30,8 +30,6 @@ import java.util.Map; */ public class ConnectionSettings { - public static final String WILDCARD_ADDRESS = "*"; - String protocol = "tcp"; String host = "localhost"; String vhost; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java new file mode 100644 index 0000000000..86af97bf7e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.SocketAddress; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; + +public interface NetworkDriver extends Sender<java.nio.ByteBuffer> +{ + // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to + // it using the SSLContextFactory if provided + void open(int port, InetAddress destination, ProtocolEngine engine, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) + throws OpenException; + + // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which + // processes incoming connections with ProtocolEngines and SSLEngines created from the factories + // (in the case of an SSLContextFactory, if provided) + void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; + + // Returns the remote address of the underlying socket + SocketAddress getRemoteAddress(); + + // Returns the local address of the underlying socket + SocketAddress getLocalAddress(); + + /** + * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been + * read in seconds + */ + void setMaxReadIdle(int idleTime); + + /** + * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been + * written in seconds + */ + void setMaxWriteIdle(int idleTime); + +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java index 8d3f7a779a..c38afe5dd5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java @@ -25,22 +25,20 @@ package org.apache.qpid.transport; * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned * from here if the underlying implementation supports them. */ -public interface NetworkTransportConfiguration +public interface NetworkDriverConfiguration { // Taken from Socket + Boolean getKeepAlive(); + Boolean getOOBInline(); + Boolean getReuseAddress(); + Integer getSoLinger(); // null means off + Integer getSoTimeout(); Boolean getTcpNoDelay(); + Integer getTrafficClass(); // The amount of memory in bytes to allocate to the incoming buffer Integer getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - Integer getSendBufferSize(); - - Integer getPort(); - - String getHost(); - - String getTransport(); - - Integer getConnectorProcessors(); -} + Integer getSendBufferSize(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 11af86f412..f21df251da 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -75,7 +75,10 @@ public class ServerDelegate extends ConnectionDelegate if (mechanism == null || mechanism.length() == 0) { - tuneAuthorizedConnection(conn); + conn.connectionTune + (getChannelMax(), + org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, + 0, getHeartbeatMax()); return; } @@ -94,7 +97,8 @@ public class ServerDelegate extends ConnectionDelegate } catch (SaslException e) { - connectionAuthFailed(conn, e); + conn.exception(e); + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()); } } @@ -105,52 +109,33 @@ public class ServerDelegate extends ConnectionDelegate return ss; } - protected void secure(final SaslServer ss, final Connection conn, final byte[] response) + private void secure(Connection conn, byte[] response) { + SaslServer ss = conn.getSaslServer(); try { byte[] challenge = ss.evaluateResponse(response); if (ss.isComplete()) { ss.dispose(); - tuneAuthorizedConnection(conn); + conn.connectionTune + (getChannelMax(), + org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, + 0, getHeartbeatMax()); + conn.setAuthorizationID(ss.getAuthorizationID()); } else { - connectionAuthContinue(conn, challenge); + conn.connectionSecure(challenge); } } catch (SaslException e) { - connectionAuthFailed(conn, e); + conn.exception(e); + conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()); } } - protected void connectionAuthFailed(final Connection conn, Exception e) - { - conn.exception(e); - conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()); - } - - protected void connectionAuthContinue(final Connection conn, byte[] challenge) - { - conn.connectionSecure(challenge); - } - - protected void tuneAuthorizedConnection(final Connection conn) - { - conn.connectionTune - (getChannelMax(), - org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, getHeartbeatMax()); - } - - protected void secure(final Connection conn, final byte[] response) - { - final SaslServer ss = conn.getSaslServer(); - secure(ss, conn, response); - } - protected int getHeartbeatMax() { return 0xFFFF; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index e0c6cb29d3..214d4534c1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -42,10 +42,7 @@ import static org.apache.qpid.util.Serial.max; import static org.apache.qpid.util.Strings.toUTF8; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -120,9 +117,7 @@ public class Session extends SessionInvoker private Thread resumer = null; private boolean transacted = false; - private SessionDetachCode detachCode; - private final Object stateLock = new Object(); - + protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); @@ -267,32 +262,7 @@ public class Session extends SessionInvoker } else if (m instanceof MessageTransfer) { - MessageTransfer xfr = (MessageTransfer)m; - - if (xfr.getHeader() != null) - { - if (xfr.getHeader().get(DeliveryProperties.class) != null) - { - xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true); - } - else - { - Struct[] structs = xfr.getHeader().getStructs(); - DeliveryProperties deliveryProps = new DeliveryProperties(); - deliveryProps.setRedelivered(true); - - List<Struct> list = Arrays.asList(structs); - list.add(deliveryProps); - xfr.setHeader(new Header(list)); - } - - } - else - { - DeliveryProperties deliveryProps = new DeliveryProperties(); - deliveryProps.setRedelivered(true); - xfr.setHeader(new Header(deliveryProps)); - } + ((MessageTransfer)m).getHeader().get(DeliveryProperties.class).setRedelivered(true); } sessionCommandPoint(m.getId(), 0); send(m); @@ -452,10 +422,7 @@ public class Session extends SessionInvoker { return; } - if (copy.size() > 0) - { - sessionCompleted(copy, options); - } + sessionCompleted(copy, options); } } @@ -694,12 +661,7 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - - boolean replayTransfer = !closing && !transacted && - m instanceof MessageTransfer && - ! m.isUnreliable(); - - if ((replayTransfer) || m.hasCompletionListener()) + if ((!closing && !transacted && m instanceof MessageTransfer) || m.hasCompletionListener()) { commands[mod(next, commands.length)] = m; commandBytes += m.getBodySize(); @@ -1033,8 +995,7 @@ public class Session extends SessionInvoker if(state == CLOSED) { - connection.removeSession(this); - listener.closed(this); + connection.removeSession(this); } } @@ -1047,54 +1008,13 @@ public class Session extends SessionInvoker { return String.format("ssn:%s", name); } - + public void setTransacted(boolean b) { this.transacted = b; } - + public boolean isTransacted(){ return transacted; } - - public void setDetachCode(SessionDetachCode dtc) - { - this.detachCode = dtc; - } - - public SessionDetachCode getDetachCode() - { - return this.detachCode; - } - - public void awaitOpen() - { - switch (state) - { - case NEW: - synchronized(stateLock) - { - Waiter w = new Waiter(stateLock, timeout); - while (w.hasTime() && state == NEW) - { - w.await(); - } - } - - if (state != OPEN) - { - throw new SessionException("Timed out waiting for Session to open"); - } - case DETACHED: - case CLOSING: - case CLOSED: - throw new SessionException("Session closed"); - default : - break; - } - } - - public Object getStateLock() - { - return stateLock; - } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 3341149e5f..5d8e4d5565 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -76,10 +76,6 @@ public class SessionDelegate @Override public void sessionAttached(Session ssn, SessionAttached atc) { ssn.setState(Session.State.OPEN); - synchronized (ssn.getStateLock()) - { - ssn.getStateLock().notifyAll(); - } } @Override public void sessionTimeout(Session ssn, SessionTimeout t) @@ -206,19 +202,11 @@ public class SessionDelegate public void closed(Session session) { - log.debug("CLOSED: [%s]", session); - synchronized (session.getStateLock()) - { - session.getStateLock().notifyAll(); - } + log.warn("CLOSED: [%s]", session); } public void detached(Session session) { - log.debug("DETACHED: [%s]", session); - synchronized (session.getStateLock()) - { - session.getStateLock().notifyAll(); - } + log.warn("DETACHED: [%s]", session); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java deleted file mode 100644 index 2c7652abeb..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.qpid.transport; - -import org.apache.mina.common.IoConnector; - -public interface SocketConnectorFactory -{ - IoConnector newConnector(); -}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java new file mode 100644 index 0000000000..c08909c6e4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.security.SecurityLayer; + +public class TransportBuilder +{ + private Connection con; + private ConnectionSettings settings; + private NetworkTransport transport; + private SecurityLayer securityLayer = new SecurityLayer(); + + public void init(Connection con) throws TransportException + { + this.con = con; + this.settings = con.getConnectionSettings(); + transport = Transport.getTransport(); + transport.init(settings); + securityLayer.init(con); + } + + public Sender<ProtocolEvent> buildSenderPipe() + { + ConnectionSettings settings = con.getConnectionSettings(); + + // Io layer + Sender<ByteBuffer> sender = transport.sender(); + + // Security layer + sender = securityLayer.sender(sender); + + Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize()); + return dis; + } + + public void buildReceiverPipe(Receiver<ProtocolEvent> delegate) + { + Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate)); + + // Security layer + receiver = securityLayer.receiver(receiver); + + //Io layer + transport.receiver(receiver); + } + + public SecurityLayer getSecurityLayer() + { + return securityLayer; + } + +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index 0ccfcfcb70..908d14a307 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -63,7 +63,6 @@ abstract class AbstractEncoder implements Encoder ENCODINGS.put(Double.class, Type.DOUBLE); ENCODINGS.put(Character.class, Type.CHAR); ENCODINGS.put(byte[].class, Type.VBIN32); - ENCODINGS.put(UUID.class, Type.UUID); } private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java deleted file mode 100644 index 7099916c33..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network; - -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkTransportConfiguration; - -public interface IncomingNetworkTransport extends NetworkTransport -{ - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory); -}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java index 4610c2351e..5e12d7e7c6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java @@ -20,13 +20,19 @@ */ package org.apache.qpid.transport.network; -/** - * A network transport is responsible for the establishment of network connections. - * NetworkTransport implementations are pluggable via the {@link Transport} class. - */ +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.ConnectionSettings; + public interface NetworkTransport { + public void init(ConnectionSettings settings); + + public Sender<ByteBuffer> sender(); + + public void receiver(Receiver<ByteBuffer> delegate); + public void close(); - - public NetworkConnection getConnection(); -} +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index 2c10a30f10..f0bf04d04f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,129 +7,50 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ -package org.apache.qpid.transport.network; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +package org.apache.qpid.transport.network; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.transport.TransportException; public class Transport -{ - public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport"; - public static final String QPID_TRANSPORT_V0_8_PROPNAME = "qpid.transport.v0_8"; - public static final String QPID_TRANSPORT_V0_9_PROPNAME = "qpid.transport.v0_9"; - public static final String QPID_TRANSPORT_V0_9_1_PROPNAME = "qpid.transport.v0_9_1"; - public static final String QPID_TRANSPORT_V0_10_PROPNAME = "qpid.transport.v0_10"; - public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport"; - - // Can't reference the class directly here, as this would preclude the ability to bundle transports separately. - private static final String MINA_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; - private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport"; - - public static final String TCP = "tcp"; - - private final static Map<ProtocolVersion,String> OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP; - - static - { - final Map<ProtocolVersion,String> map = new HashMap<ProtocolVersion, String>(); - map.put(ProtocolVersion.v8_0, MINA_TRANSPORT_CLASSNAME); - map.put(ProtocolVersion.v0_9, MINA_TRANSPORT_CLASSNAME); - map.put(ProtocolVersion.v0_91, MINA_TRANSPORT_CLASSNAME); - map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME); - - OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map); - } - - public static IncomingNetworkTransport getIncomingTransportInstance() +{ + private final static Class<?> transportClass; + + static { - return (IncomingNetworkTransport) loadTransportClass( - System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, MINA_TRANSPORT_CLASSNAME)); - } - - public static OutgoingNetworkTransport getOutgoingTransportInstance( - final ProtocolVersion protocolVersion) - { - - final String overrride = getOverrideClassNameFromSystemProperty(protocolVersion); - final String networkTransportClassName; - if (overrride != null) - { - networkTransportClassName = overrride; - } - else - { - networkTransportClassName = OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP.get(protocolVersion); - } - - return (OutgoingNetworkTransport) loadTransportClass(networkTransportClassName); - } - - private static NetworkTransport loadTransportClass(final String networkTransportClassName) - { - if (networkTransportClassName == null) - { - throw new IllegalArgumentException("transport class name must not be null"); - } - try { - final Class<?> clazz = Class.forName(networkTransportClassName); - return (NetworkTransport) clazz.newInstance(); + transportClass = + Class.forName(System.getProperty("qpid.transport", + "org.apache.qpid.transport.network.io.IoNetworkTransport")); + } - catch (InstantiationException e) + catch(Exception e) { - throw new TransportException("Unable to instantiate transport class " + networkTransportClassName, e); - } - catch (IllegalAccessException e) - { - throw new TransportException("Access exception " + networkTransportClassName, e); - } - catch (ClassNotFoundException e) - { - throw new TransportException("Unable to load transport class " + networkTransportClassName, e); + throw new Error("Error occured while loading Qpid Transport",e); } } - - private static String getOverrideClassNameFromSystemProperty(final ProtocolVersion protocolVersion) + + public static NetworkTransport getTransport() throws TransportException { - final String protocolSpecificSystemProperty; - - if (ProtocolVersion.v0_10.equals(protocolVersion)) - { - protocolSpecificSystemProperty = QPID_TRANSPORT_V0_10_PROPNAME; - } - else if (ProtocolVersion.v0_91.equals(protocolVersion)) - { - protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_1_PROPNAME; - } - else if (ProtocolVersion.v0_9.equals(protocolVersion)) - { - protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_PROPNAME; - } - else if (ProtocolVersion.v8_0.equals(protocolVersion)) + try { - protocolSpecificSystemProperty = QPID_TRANSPORT_V0_8_PROPNAME; + return (NetworkTransport)transportClass.newInstance(); } - else + catch (Exception e) { - throw new IllegalArgumentException("Unknown ProtocolVersion " + protocolVersion); + throw new TransportException("Error while creating a new transport instance",e); } - - return System.getProperty(protocolSpecificSystemProperty, System.getProperty(QPID_TRANSPORT_PROPNAME)); } -} +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java new file mode 100644 index 0000000000..ecc5f6d07c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java @@ -0,0 +1,130 @@ +package org.apache.qpid.transport.network.io; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.nio.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.BodyFactory; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentBodyFactory; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderBodyFactory; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.HeartbeatBodyFactory; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Receiver; + +public class InputHandler_0_9 implements Receiver<ByteBuffer> +{ + + private AMQVersionAwareProtocolSession _session; + private MethodRegistry _registry; + private BodyFactory bodyFactory; + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + + static + { + _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); + _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + public InputHandler_0_9(AMQVersionAwareProtocolSession session) + { + _session = session; + _registry = _session.getMethodRegistry(); + } + + public void closed() + { + // AS FIXME: implement + } + + public void exception(Throwable t) + { + // TODO: propogate exception to things + t.printStackTrace(); + } + + public void received(ByteBuffer buf) + { + org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf); + try + { + final byte type = in.get(); + if (type == AMQMethodBody.TYPE) + { + bodyFactory = new AMQMethodBodyFactory(_session); + } + else + { + bodyFactory = _bodiesSupported[type]; + } + + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); + } + + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize, null); + } + + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type, null); + } + + try + { + frame.getBodyFrame().handle(frame.getChannel(), _session); + } + catch (AMQException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + catch (AMQFrameDecodingException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java new file mode 100644 index 0000000000..8530240dcc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.TransportException; + +import java.io.IOException; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; + +import java.nio.ByteBuffer; + + +/** + * IoAcceptor + * + */ + +public class IoAcceptor<E> extends Thread +{ + + + private ServerSocket socket; + private Binding<E,ByteBuffer> binding; + + public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding) + throws IOException + { + socket = new ServerSocket(); + socket.setReuseAddress(true); + socket.bind(address); + this.binding = binding; + + setName(String.format("IoAcceptor - %s", socket.getInetAddress())); + } + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() throws IOException + { + if (!socket.isClosed()) + { + socket.close(); + } + } + + public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding) + throws IOException + { + this(new InetSocketAddress(host, port), binding); + } + + public void run() + { + while (true) + { + try + { + Socket sock = socket.accept(); + IoTransport<E> transport = new IoTransport<E>(sock, binding,false); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java index ff86ba481f..69b3a0ce45 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,26 +7,29 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ -package org.apache.qpid.transport.network; +package org.apache.qpid.transport.network.io; +import java.net.Socket; import java.nio.ByteBuffer; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; -public interface OutgoingNetworkTransport extends NetworkTransport +public interface IoContext { - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory); -}
\ No newline at end of file + Sender<ByteBuffer> getSender(); + + IoReceiver getReceiver(); + + Socket getSocket(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java deleted file mode 100644 index cca1fc46c9..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ /dev/null @@ -1,93 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class IoNetworkConnection implements NetworkConnection -{ - private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkConnection.class); - private final Socket _socket; - private final long _timeout; - private final IoSender _ioSender; - private final IoReceiver _ioReceiver; - - public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, - int sendBufferSize, int receiveBufferSize, long timeout) - { - _socket = socket; - _timeout = timeout; - - _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); - _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); - _ioSender.registerCloseListener(_ioReceiver); - - _ioReceiver.initiate(); - _ioSender.initiate(); - } - - public Sender<ByteBuffer> getSender() - { - return _ioSender; - } - - public void close() - { - try - { - _ioSender.close(); - } - finally - { - _ioReceiver.close(false); - } - } - - public SocketAddress getRemoteAddress() - { - return _socket.getRemoteSocketAddress(); - } - - public SocketAddress getLocalAddress() - { - return _socket.getLocalSocketAddress(); - } - - public void setMaxWriteIdle(int sec) - { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender - } - - public void setMaxReadIdle(int sec) - { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index d611ab1cf3..dd6a37eca2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -27,15 +27,14 @@ import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; -import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.NetworkTransport; import org.apache.qpid.transport.util.Logger; -public class IoNetworkTransport implements OutgoingNetworkTransport +public class IoNetworkTransport implements NetworkTransport, IoContext { static { @@ -45,31 +44,34 @@ public class IoNetworkTransport implements OutgoingNetworkTransport (Boolean.getBoolean("amqj.enableDirectBuffers")); } - private static final Logger LOGGER = Logger.get(IoNetworkTransport.class); + private static final Logger log = Logger.get(IoNetworkTransport.class); - private Socket _socket; - private IoNetworkConnection _connection; - private long _timeout = 60000; + private Socket socket; + private Sender<ByteBuffer> sender; + private IoReceiver receiver; + private long timeout = 60000; + private ConnectionSettings settings; - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory) + public void init(ConnectionSettings settings) { - int sendBufferSize = settings.getWriteBufferSize(); - int receiveBufferSize = settings.getReadBufferSize(); - try { - _socket = new Socket(); - _socket.setReuseAddress(true); - _socket.setTcpNoDelay(settings.isTcpNodelay()); - _socket.setSendBufferSize(sendBufferSize); - _socket.setReceiveBufferSize(receiveBufferSize); + this.settings = settings; + InetAddress address = InetAddress.getByName(settings.getHost()); + socket = new Socket(); + socket.setReuseAddress(true); + socket.setTcpNoDelay(settings.isTcpNodelay()); - LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize()); + log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); - InetAddress address = InetAddress.getByName(settings.getHost()); + socket.setSendBufferSize(settings.getWriteBufferSize()); + socket.setReceiveBufferSize(settings.getReadBufferSize()); - _socket.connect(new InetSocketAddress(address, settings.getPort())); + log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.connect(new InetSocketAddress(address, settings.getPort())); } catch (SocketException e) { @@ -79,35 +81,36 @@ public class IoNetworkTransport implements OutgoingNetworkTransport { throw new TransportException("Error connecting to broker", e); } + } - try - { - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); - } - catch(Exception e) - { - try - { - _socket.close(); - } - catch(IOException ioe) - { - //ignored, throw based on original exception - } - - throw new TransportException("Error creating network connection", e); - } + public void receiver(Receiver<ByteBuffer> delegate) + { + receiver = new IoReceiver(this, delegate, + 2*settings.getReadBufferSize() , timeout); + } - return _connection; + public Sender<ByteBuffer> sender() + { + return new IoSender(this, 2*settings.getWriteBufferSize(), timeout); } public void close() { - _connection.close(); + + } + + public Sender<ByteBuffer> getSender() + { + return sender; + } + + public IoReceiver getReceiver() + { + return receiver; } - public NetworkConnection getConnection() + public Socket getSocket() { - return _connection; + return socket; } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index fea87fc350..19a683d505 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; @@ -38,54 +37,43 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ -final class IoReceiver implements Runnable, Closeable +final class IoReceiver implements Runnable { private static final Logger log = Logger.get(IoReceiver.class); + private final IoContext ioCtx; private final Receiver<ByteBuffer> receiver; private final int bufferSize; private final Socket socket; private final long timeout; private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; - private static final boolean shutdownBroken; - static - { - String osName = System.getProperty("os.name"); - shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*"); - } + private final boolean shutdownBroken = + ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); - public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) + public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver, + int bufferSize, long timeout) { + this.ioCtx = ioCtx; this.receiver = receiver; this.bufferSize = bufferSize; - this.socket = socket; + this.socket = ioCtx.getSocket(); this.timeout = timeout; try { - //Create but deliberately don't start the thread. receiverThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { - throw new RuntimeException("Error creating IOReceiver thread",e); + throw new Error("Error creating IOReceiver thread",e); } receiverThread.setDaemon(true); receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); - } - - public void initiate() - { receiverThread.start(); } - public void close() - { - close(false); - } - void close(boolean block) { if (!closed.getAndSet(true)) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 1bb515624c..66b97e8225 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,11 +24,8 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; @@ -46,6 +43,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> // we can test other cases as well private final static int START = Integer.MAX_VALUE - 10; + private final IoContext ioCtx; private final long timeout; private final Socket socket; private final OutputStream out; @@ -58,13 +56,14 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; - private final List<Closeable> _listeners = new ArrayList<Closeable>(); private volatile Throwable exception = null; - public IoSender(Socket socket, int bufferSize, long timeout) + + public IoSender(IoContext ioCtx, int bufferSize, long timeout) { - this.socket = socket; + this.ioCtx = ioCtx; + this.socket = ioCtx.getSocket(); this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; @@ -79,7 +78,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - //Create but deliberately don't start the thread. senderThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) @@ -89,10 +87,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> senderThread.setDaemon(true); senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - } - - public void initiate() - { senderThread.start(); } @@ -210,20 +204,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> senderThread.join(timeout); if (senderThread.isAlive()) { - log.error("join timed out"); throw new SenderException("join timed out"); } } + ioCtx.getReceiver().close(false); } catch (InterruptedException e) { - log.error("interrupted whilst waiting for sender thread to stop"); throw new SenderException(e); } - finally - { - closeListeners(); - } + if (reportException && exception != null) { throw new SenderException(exception); @@ -231,28 +221,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - private void closeListeners() - { - Exception ex = null; - for(Closeable listener : _listeners) - { - try - { - listener.close(); - } - catch(Exception e) - { - log.error("Exception closing listener: " + e.getMessage()); - ex = e; - } - } - - if (ex != null) - { - throw new SenderException(ex.getMessage(), ex); - } - } - public void run() { final int size = buffer.length; @@ -336,9 +304,4 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> throw new SenderException(e); } } - - public void registerCloseListener(Closeable listener) - { - _listeners.add(listener); - } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java new file mode 100644 index 0000000000..bfdbb34978 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.security.ssl.SSLReceiver; +import org.apache.qpid.transport.network.security.ssl.SSLSender; +import org.apache.qpid.transport.util.Logger; + +/** + * This class provides a socket based transport using the java.io + * classes. + * + * The following params are configurable via JVM arguments + * TCP_NO_DELAY - amqj.tcpNoDelay + * SO_RCVBUF - amqj.receiveBufferSize + * SO_SNDBUF - amqj.sendBufferSize + */ +public final class IoTransport<E> implements IoContext +{ + + static + { + org.apache.mina.common.ByteBuffer.setAllocator + (new org.apache.mina.common.SimpleByteBufferAllocator()); + org.apache.mina.common.ByteBuffer.setUseDirectBuffers + (Boolean.getBoolean("amqj.enableDirectBuffers")); + } + + private static final Logger log = Logger.get(IoTransport.class); + + private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; + private static int readBufferSize = Integer.getInteger + ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); + private static int writeBufferSize = Integer.getInteger + ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); + + private Socket socket; + private Sender<ByteBuffer> sender; + private E endpoint; + private IoReceiver receiver; + private long timeout = 60000; + + IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl) + { + this.socket = socket; + + if (ssl) + { + SSLEngine engine = null; + SSLContext sslCtx; + try + { + sslCtx = createSSLContext(); + } + catch (Exception e) + { + throw new TransportException("Error creating SSL Context", e); + } + + try + { + engine = sslCtx.createSSLEngine(); + engine.setUseClientMode(true); + } + catch(Exception e) + { + throw new TransportException("Error creating SSL Engine", e); + } + + this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout)); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), + 2*readBufferSize, timeout); + + log.info("SSL Sender and Receiver initiated"); + } + else + { + this.sender = new IoSender(this, 2*writeBufferSize, timeout); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, binding.receiver(endpoint), + 2*readBufferSize, timeout); + } + } + + public Sender<ByteBuffer> getSender() + { + return sender; + } + + public IoReceiver getReceiver() + { + return receiver; + } + + public Socket getSocket() + { + return socket; + } + + public static final <E> E connect(String host, int port, + Binding<E,ByteBuffer> binding, + boolean ssl) + { + Socket socket = createSocket(host, port); + IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl); + return transport.endpoint; + } + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate, + boolean ssl) + { + return connect(host, port, ConnectionBinding.get(delegate),ssl); + } + + public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port, boolean ssl) + { + connect(host, port, new Binding_0_9(session),ssl); + } + + private static class Binding_0_9 + implements Binding<AMQVersionAwareProtocolSession,ByteBuffer> + { + + private AMQVersionAwareProtocolSession session; + + Binding_0_9(AMQVersionAwareProtocolSession session) + { + this.session = session; + } + + public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender) + { + session.setSender(sender); + return session; + } + + public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn) + { + return new InputHandler_0_9(ssn); + } + + } + + private static Socket createSocket(String host, int port) + { + try + { + InetAddress address = InetAddress.getByName(host); + Socket socket = new Socket(); + socket.setReuseAddress(true); + socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + + log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.setSendBufferSize(writeBufferSize); + socket.setReceiveBufferSize(readBufferSize); + + log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.connect(new InetSocketAddress(address, port)); + return socket; + } + catch (SocketException e) + { + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + } + + private SSLContext createSSLContext() throws Exception + { + String trustStorePath = System.getProperty("javax.net.ssl.trustStore"); + String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword"); + String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509"); + + String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath); + String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword); + String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509"); + + SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword, + trustStoreCertType,keyStorePath, + keyStorePassword,keyStoreCertType); + + return sslContextFactory.buildServerContext(); + + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java new file mode 100644 index 0000000000..0f2c0d0226 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -0,0 +1,435 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExecutorThreadModel; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.SessionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.NetworkDriverConfiguration; +import org.apache.qpid.transport.OpenException; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver +{ + + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + + ProtocolEngine _protocolEngine; + private boolean _useNIO = false; + private int _processors = 4; + private boolean _executorPool = false; + private SSLContextFactory _sslFactory = null; + private IoConnector _socketConnector; + private IoAcceptor _acceptor; + private IoSession _ioSession; + private ProtocolEngineFactory _factory; + private boolean _protectIO; + private NetworkDriverConfiguration _config; + private Throwable _lastException; + private boolean _acceptingConnections = false; + + private WriteFuture _lastWriteFuture; + + private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); + + static + { + org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + + //override the MINA defaults to prevent use of the PooledByteBufferAllocator + org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + } + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO, + ProtocolEngine protocolEngine, IoSession session) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + _protocolEngine = protocolEngine; + _ioSession = session; + _ioSession.setAttachment(_protocolEngine); + } + + public MINANetworkDriver() + { + + } + + public MINANetworkDriver(IoConnector ioConnector) + { + _socketConnector = ioConnector; + } + + public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) + { + _socketConnector = ioConnector; + _protocolEngine = engine; + } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + _factory = factory; + _config = config; + + if (_useNIO) + { + _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors, + new NewThreadExecutor()); + } + else + { + _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); + } + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)")); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + + if (config != null) + { + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + sc.setSendBufferSize(config.getSendBufferSize()); + sc.setTcpNoDelay(config.getTcpNoDelay()); + } + + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + + if (addresses != null && addresses.length > 0) + { + for (InetAddress addr : addresses) + { + try + { + _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to %1s:%2s", addr, port)); + } + } + } + else + { + try + { + _acceptor.bind(new InetSocketAddress(port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to *:%1s", port)); + } + } + _acceptingConnections = true; + } + + public SocketAddress getRemoteAddress() + { + return _ioSession.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _ioSession.getLocalAddress(); + } + + + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + SSLContextFactory sslFactory) throws OpenException + { + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + + if (_useNIO) + { + _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); + } + else + { + _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking + // connector + } + + SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); + String s = ""; + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + for(StackTraceElement elt : trace) + { + if(elt.getClassName().contains("Test")) + { + s = elt.getClassName(); + break; + } + } + cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s)); + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); + scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); + scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); + + // Don't have the connector's worker thread wait around for other + // connections (we only use + // one SocketConnector per connection at the moment anyway). This allows + // short-running + // clients (like unit tests) to complete quickly. + if (_socketConnector instanceof SocketConnector) + { + ((SocketConnector) _socketConnector).setWorkerTimeout(0); + } + + ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); + future.join(); + if (!future.isConnected()) + { + throw new OpenException("Could not open connection", _lastException); + } + _ioSession = future.getSession(); + _ioSession.setAttachment(engine); + engine.setNetworkDriver(this); + _protocolEngine = engine; + } + + public void setMaxReadIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime); + } + + public void setMaxWriteIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime); + } + + public void close() + { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + if (_ioSession != null) + { + _ioSession.close(); + } + } + + public void flush() + { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } + } + + public void send(ByteBuffer msg) + { + org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity()); + minaBuf.put(msg); + minaBuf.flip(); + _lastWriteFuture = _ioSession.write(minaBuf); + } + + public void setIdleTimeout(int i) + { + // MINA doesn't support setting SO_TIMEOUT + } + + public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception + { + if (_protocolEngine != null) + { + _protocolEngine.exception(throwable); + } + else + { + _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable); + } + _lastException = throwable; + } + + /** + * Invoked when a message is received on a particular protocol session. Note + * that a protocol session is directly tied to a particular physical + * connection. + * + * @param protocolSession + * the protocol session that received the message + * @param message + * the message itself (i.e. a decoded frame) + * + * @throws Exception + * if the message cannot be processed + */ + public void messageReceived(IoSession protocolSession, Object message) throws Exception + { + if (message instanceof org.apache.mina.common.ByteBuffer) + { + ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf()); + } + else + { + throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); + } + } + + public void sessionClosed(IoSession protocolSession) throws Exception + { + ((ProtocolEngine) protocolSession.getAttachment()).closed(); + } + + public void sessionCreated(IoSession protocolSession) throws Exception + { + // Configure the session with SSL if necessary + SessionUtil.initialize(protocolSession); + if (_executorPool) + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + else + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + // Do we want to have read/write buffer limits? + if (_protectIO) + { + //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); + + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); + writefilter.attach(chain); + + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + } + + if (_ioSession == null) + { + _ioSession = protocolSession; + } + + if (_acceptingConnections) + { + // Set up the protocol engine + ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); + MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); + protocolEngine.setNetworkDriver(newDriver); + } + } + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (IdleStatus.WRITER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).writerIdle(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).readerIdle(); + } + } + + private ProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + + public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) + { + _factory = engineFactory; + _acceptingConnections = acceptingConnections; + } + + public void setProtocolEngine(ProtocolEngine protocolEngine) + { + _protocolEngine = protocolEngine; + if (_ioSession != null) + { + _ioSession.setAttachment(protocolEngine); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java new file mode 100644 index 0000000000..b89eed48b0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -0,0 +1,274 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.*; + +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.executor.ExecutorFilter; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.ConnectionBinding; + +import org.apache.qpid.transport.util.Logger; + +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; + +import static org.apache.qpid.transport.util.Functions.*; + +/** + * MinaHandler + * + * @author Rafael H. Schloming + */ +//RA making this public until we sort out the package issues +public class MinaHandler<E> implements IoHandler +{ + /** Default buffer size for pending messages reads */ + private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; + /** Default buffer size for pending messages writes */ + private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; + private static final int MAX_RCVBUF = 64*1024; + + private static final Logger log = Logger.get(MinaHandler.class); + + static + { + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + } + + private final Binding<E,java.nio.ByteBuffer> binding; + + private MinaHandler(Binding<E,java.nio.ByteBuffer> binding) + { + this.binding = binding; + } + + public void messageReceived(IoSession ssn, Object obj) + { + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + ByteBuffer buf = (ByteBuffer) obj; + try + { + attachment.receiver.received(buf.buf()); + } + catch (Throwable t) + { + log.error(t, "exception handling buffer %s", str(buf.buf())); + throw new RuntimeException(t); + } + } + + public void messageSent(IoSession ssn, Object obj) + { + // do nothing + } + + public void exceptionCaught(IoSession ssn, Throwable e) + { + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + attachment.receiver.exception(e); + } + + /** + * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an optional protectio + * + * @param session The MINA session. + * @throws Exception Any underlying exceptions are allowed to fall through to MINA. + */ + public void sessionCreated(IoSession session) throws Exception + { + log.debug("Protocol session created for session " + System.identityHashCode(session)); + + if (Boolean.getBoolean("protectio")) + { + try + { + //Add IO Protection Filters + IoFilterChain chain = session.getFilterChain(); + + session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize( + Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize( + Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); + writefilter.attach(chain); + session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + + log.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } + } + + public void sessionOpened(final IoSession ssn) + { + log.debug("opened: %s", this); + E endpoint = binding.endpoint(new MinaSender(ssn)); + Attachment<E> attachment = + new Attachment<E>(endpoint, binding.receiver(endpoint)); + + // We need to synchronize and notify here because the MINA + // connect future returns the session prior to the attachment + // being set. This is arguably a bug in MINA. + synchronized (ssn) + { + ssn.setAttachment(attachment); + ssn.notifyAll(); + } + } + + public void sessionClosed(IoSession ssn) + { + log.debug("closed: %s", ssn); + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + attachment.receiver.closed(); + ssn.setAttachment(null); + } + + public void sessionIdle(IoSession ssn, IdleStatus status) + { + // do nothing + } + + private static class Attachment<E> + { + + E endpoint; + Receiver<java.nio.ByteBuffer> receiver; + + Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver) + { + this.endpoint = endpoint; + this.receiver = receiver; + } + } + + public static final void accept(String host, int port, + Binding<?,java.nio.ByteBuffer> binding) + throws IOException + { + accept(new InetSocketAddress(host, port), binding); + } + + public static final <E> void accept(SocketAddress address, + Binding<E,java.nio.ByteBuffer> binding) + throws IOException + { + IoAcceptor acceptor = new SocketAcceptor(); + acceptor.bind(address, new MinaHandler<E>(binding)); + } + + public static final <E> E connect(String host, int port, + Binding<E,java.nio.ByteBuffer> binding) + { + return connect(new InetSocketAddress(host, port), binding); + } + + public static final <E> E connect(SocketAddress address, + Binding<E,java.nio.ByteBuffer> binding) + { + MinaHandler<E> handler = new MinaHandler<E>(binding); + SocketConnector connector = new SocketConnector(); + IoServiceConfig acceptorConfig = connector.getDefaultConfig(); + acceptorConfig.setThreadModel(ThreadModel.MANUAL); + SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig(); + scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize"); + if (sendBufferSize != null && sendBufferSize > 0) + { + scfg.setSendBufferSize(sendBufferSize); + } + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize"); + if (receiveBufferSize != null && receiveBufferSize > 0) + { + scfg.setReceiveBufferSize(receiveBufferSize); + } + else if (scfg.getReceiveBufferSize() > MAX_RCVBUF) + { + scfg.setReceiveBufferSize(MAX_RCVBUF); + } + connector.setWorkerTimeout(0); + ConnectFuture cf = connector.connect(address, handler); + cf.join(); + IoSession ssn = cf.getSession(); + + // We need to synchronize and wait here because the MINA + // connect future returns the session prior to the attachment + // being set. This is arguably a bug in MINA. + synchronized (ssn) + { + while (ssn.getAttachment() == null) + { + try + { + ssn.wait(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); + return attachment.endpoint; + } + + public static final void accept(String host, int port, + ConnectionDelegate delegate) + throws IOException + { + accept(host, port, ConnectionBinding.get(delegate)); + } + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + return connect(host, port, ConnectionBinding.get(delegate)); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java deleted file mode 100644 index 0f433f6eeb..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java +++ /dev/null @@ -1,81 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; - -public class MinaNetworkConnection implements NetworkConnection -{ - private IoSession _session; - private Sender<ByteBuffer> _sender; - - public MinaNetworkConnection(IoSession session) - { - _session = session; - _sender = new MinaSender(_session); - } - - public Sender<ByteBuffer> getSender() - { - return _sender; - } - - public void close() - { - _session.close(); - } - - public SocketAddress getRemoteAddress() - { - return _session.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _session.getLocalAddress(); - } - - public long getReadBytes() - { - return _session.getReadBytes(); - } - - public long getWrittenBytes() - { - return _session.getWrittenBytes(); - } - - public void setMaxWriteIdle(int sec) - { - _session.setIdleTime(IdleStatus.WRITER_IDLE, sec); - } - - public void setMaxReadIdle(int sec) - { - _session.setIdleTime(IdleStatus.READER_IDLE, sec); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java deleted file mode 100644 index c00187480c..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.util.SessionUtil; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MinaNetworkHandler extends IoHandlerAdapter -{ - private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class); - - private ProtocolEngineFactory _factory; - private SSLContextFactory _sslFactory = null; - - static - { - boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers"); - LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers"); - ByteBuffer.setUseDirectBuffers(directBuffers); - - //override the MINA defaults to prevent use of the PooledByteBufferAllocator - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory) - { - _sslFactory = sslFactory; - _factory = factory; - } - - public MinaNetworkHandler(SSLContextFactory sslFactory) - { - this(sslFactory, null); - } - - public void messageReceived(IoSession session, Object message) - { - ProtocolEngine engine = (ProtocolEngine) session.getAttachment(); - ByteBuffer buf = (ByteBuffer) message; - try - { - engine.received(buf.buf()); - } - catch (RuntimeException re) - { - engine.exception(re); - } - } - - public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception - { - ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); - if(engine != null) - { - LOGGER.error("Exception caught by Mina", throwable); - engine.exception(throwable); - } - else - { - LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable); - } - } - - public void sessionCreated(IoSession ioSession) throws Exception - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Created session: " + ioSession.getRemoteAddress()); - } - - SessionUtil.initialize(ioSession); - - if (_sslFactory != null) - { - ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } - - if (_factory != null) - { - NetworkConnection netConn = new MinaNetworkConnection(ioSession); - - ProtocolEngine engine = _factory.newProtocolEngine(netConn); - ioSession.setAttachment(engine); - } - } - - public void sessionClosed(IoSession ioSession) throws Exception - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("closed: " + ioSession.getRemoteAddress()); - } - - ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); - if(engine != null) - { - engine.closed(); - } - else - { - LOGGER.error("Unable to close ProtocolEngine as none was present"); - } - } - - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - if (IdleStatus.WRITER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).writerIdle(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).readerIdle(); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java deleted file mode 100644 index d0367b82f4..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ /dev/null @@ -1,219 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExecutorThreadModel; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoSession; -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; - -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SocketConnectorFactory; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport -{ - private static final int UNKNOWN = -1; - private static final int TCP = 0; - - public NetworkConnection _connection; - private SocketAcceptor _acceptor; - private InetSocketAddress _address; - - public NetworkConnection connect(ConnectionSettings settings, - Receiver<java.nio.ByteBuffer> delegate, SSLContextFactory sslFactory) - { - int transport = getTransport(settings.getProtocol()); - - IoConnectorCreator stc; - switch(transport) - { - case TCP: - stc = new IoConnectorCreator(new SocketConnectorFactory() - { - public IoConnector newConnector() - { - return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - } - }); - _connection = stc.connect(delegate, settings, sslFactory); - break; - case UNKNOWN: - default: - throw new TransportException("Unknown protocol: " + settings.getProtocol()); - } - - return _connection; - } - - private static int getTransport(String transport) - { - if (transport.equals(Transport.TCP)) - { - return TCP; - } - - return UNKNOWN; - } - - public void close() - { - if(_connection != null) - { - _connection.close(); - } - if (_acceptor != null) - { - _acceptor.unbindAll(); - } - } - - public NetworkConnection getConnection() - { - return _connection; - } - - public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory, - final SSLContextFactory sslFactory) - { - int processors = config.getConnectorProcessors(); - - if (Transport.TCP.equalsIgnoreCase(config.getTransport())) - { - _acceptor = new SocketAcceptor(processors, new NewThreadExecutor()); - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)")); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - sc.setTcpNoDelay(config.getTcpNoDelay()); - sc.setSendBufferSize(config.getSendBufferSize()); - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - - if (config.getHost().equals(WILDCARD_ADDRESS)) - { - _address = new InetSocketAddress(config.getPort()); - } - else - { - _address = new InetSocketAddress(config.getHost(), config.getPort()); - } - } - else - { - throw new TransportException("Unknown transport: " + config.getTransport()); - } - - try - { - _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory)); - } - catch (IOException e) - { - throw new TransportException("Could not bind to " + _address, e); - } - } - - - private static class IoConnectorCreator - { - private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class); - - private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024; - - private SocketConnectorFactory _ioConnectorFactory; - - public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory) - { - _ioConnectorFactory = socketConnectorFactory; - } - - public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings settings, SSLContextFactory sslFactory) - { - final IoConnector ioConnector = _ioConnectorFactory.newConnector(); - final SocketAddress address; - final String protocol = settings.getProtocol(); - final int port = settings.getPort(); - - if (Transport.TCP.equalsIgnoreCase(protocol)) - { - address = new InetSocketAddress(settings.getHost(), port); - } - else - { - throw new TransportException("Unknown transport: " + protocol); - } - - LOGGER.info("Attempting connection to " + address); - - if (ioConnector instanceof SocketConnector) - { - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)")); - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay(true); - scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); - scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); - - // Don't have the connector's worker thread wait around for other - // connections (we only use one SocketConnector per connection - // at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - ((SocketConnector) ioConnector).setWorkerTimeout(0); - } - - ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory), ioConnector.getDefaultConfig()); - future.join(); - if (!future.isConnected()) - { - throw new TransportException("Could not open connection"); - } - - IoSession session = future.getSession(); - session.setAttachment(receiver); - - return new MinaNetworkConnection(session); - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index be114e2fa1..22b9c5e784 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -25,55 +25,66 @@ import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; import org.apache.qpid.transport.Sender; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.TransportException; + /** * MinaSender */ + public class MinaSender implements Sender<java.nio.ByteBuffer> { - private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); - - private final IoSession _session; - private WriteFuture _lastWrite; + private static final int TIMEOUT = 2 * 60 * 1000; + + private final IoSession session; + private WriteFuture lastWrite = null; public MinaSender(IoSession session) { - _session = session; + this.session = session; } - public synchronized void send(java.nio.ByteBuffer msg) + public void send(java.nio.ByteBuffer buf) { - _log.debug("sending data:"); - ByteBuffer mina = ByteBuffer.allocate(msg.limit()); - mina.put(msg); - mina.flip(); - _lastWrite = _session.write(mina); - _log.debug("sent data:"); - } + if (session.isClosing()) + { + throw new TransportException("attempted to write to a closed socket"); + } - public synchronized void flush() - { - if (_lastWrite != null) + synchronized (this) { - _lastWrite.join(); + lastWrite = session.write(ByteBuffer.wrap(buf)); } } - public void close() + public void flush() { - // MINA will sometimes throw away in-progress writes when you ask it to close - flush(); - CloseFuture closed = _session.close(); + // pass + } + + public synchronized void close() + { + // MINA will sometimes throw away in-progress writes when you + // ask it to close + synchronized (this) + { + if (lastWrite != null) + { + lastWrite.join(); + } + } + CloseFuture closed = session.close(); closed.join(); } public void setIdleTimeout(int i) { - //TODO: - //We are instead using the setMax[Read|Write]IdleTime methods in - //MinaNetworkConnection for this. Should remove this method from - //sender interface, but currently being used by IoSender for 0-10. + //noop } + + public long getIdleTimeout() + { + return 0; + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java new file mode 100644 index 0000000000..84e66c25bd --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -0,0 +1,135 @@ +package org.apache.qpid.transport.network.nio; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; + +public class NioHandler implements Runnable +{ + private Receiver<ByteBuffer> _receiver; + private SocketChannel _ch; + private ByteBuffer _readBuf; + private static Map<Long,NioSender> _handlers = new ConcurrentHashMap<Long,NioSender>(); + + private NioHandler(){} + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + NioHandler handler = new NioHandler(); + return handler.connectInternal(host,port,delegate); + } + + private Connection connectInternal(String host, int port, + ConnectionDelegate delegate) + { + try + { + SocketAddress address = new InetSocketAddress(host,port); + _ch = SocketChannel.open(); + _ch.socket().setReuseAddress(true); + _ch.configureBlocking(true); + _ch.socket().setTcpNoDelay(true); + if (address != null) + { + _ch.socket().connect(address); + } + while (_ch.isConnectionPending()) + { + + } + + } + catch (SocketException e) + { + + e.printStackTrace(); + } + catch (IOException e) + { + e.printStackTrace(); + } + + NioSender sender = new NioSender(_ch); + Connection con = new Connection(); + con.setSender(new Disassembler(sender, 64*1024 - 1)); + con.setConnectionDelegate(delegate); + + _handlers.put(con.getConnectionId(),sender); + + _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR); + + Thread t = new Thread(this); + t.start(); + + return con; + } + + public void run() + { + _readBuf = ByteBuffer.allocate(512); + long read = 0; + while(_ch.isConnected() && _ch.isOpen()) + { + try + { + read = _ch.read(_readBuf); + if (read > 0) + { + _readBuf.flip(); + ByteBuffer b = ByteBuffer.allocate(_readBuf.remaining()); + b.put(_readBuf); + b.flip(); + _readBuf.clear(); + _receiver.received(b); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + + //throw new EOFException("The underlying socket/channel has closed"); + } + + public static void startBatchingFrames(int connectionId) + { + NioSender sender = _handlers.get(connectionId); + sender.setStartBatching(); + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java new file mode 100644 index 0000000000..2fa875f279 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java @@ -0,0 +1,126 @@ +package org.apache.qpid.transport.network.nio; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.qpid.transport.Sender; + +public class NioSender implements Sender<java.nio.ByteBuffer> +{ + private final Object lock = new Object(); + private SocketChannel _ch; + private boolean _batch = false; + private ByteBuffer _batcher; + + public NioSender(SocketChannel ch) + { + this._ch = ch; + } + + public void send(java.nio.ByteBuffer buf) + { + if (_batch) + { + //System.out.println(_batcher.position() + " , " + buf.remaining() + " , " + buf.position() + ","+_batcher.capacity()); + if (_batcher.position() + buf.remaining() >= _batcher.capacity()) + { + _batcher.flip(); + write(_batcher); + _batcher.clear(); + if (buf.remaining() > _batcher.capacity()) + { + write(buf); + } + else + { + _batcher.put(buf); + } + } + else + { + _batcher.put(buf); + } + } + else + { + write(buf); + } + } + + public void flush() + { + // pass + } + + private void write(java.nio.ByteBuffer buf) + { + synchronized (lock) + { + if( _ch.isConnected() && _ch.isOpen()) + { + try + { + _ch.write(buf); + } + catch(Exception e) + { + e.fillInStackTrace(); + } + } + else + { + throw new RuntimeException("Trying to write on a closed socket"); + } + + } + } + + public void setStartBatching() + { + _batch = true; + _batcher = ByteBuffer.allocate(1024); + } + + public void close() + { + // MINA will sometimes throw away in-progress writes when you + // ask it to close + synchronized (lock) + { + try + { + _ch.close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } + + public void setIdleTimeout(int i) + { + //noop + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java index 69e4b52edb..3f0966903d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -43,8 +43,8 @@ public class SecurityLayer Connection con; SSLSecurityLayer sslLayer; SASLSecurityLayer saslLayer; - - public SecurityLayer(Connection con) + + public void init(Connection con) throws TransportException { this.con = con; this.settings = con.getConnectionSettings(); @@ -55,9 +55,10 @@ public class SecurityLayer if (settings.isUseSASLEncryption()) { saslLayer = new SASLSecurityLayer(); - } + } + } - + public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate) { Sender<ByteBuffer> sender = delegate; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index 2d9e4e9a7e..27255f79f6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -43,7 +43,8 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { this.delegate = delegate; log.debug("SASL Sender enabled"); } - + + @Override public void close() { @@ -64,11 +65,13 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { } } + @Override public void flush() { delegate.flush(); } + @Override public void send(ByteBuffer buf) { if (closed.get()) @@ -105,6 +108,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { } } + @Override public void setIdleTimeout(int i) { delegate.setIdleTimeout(i); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java index 0dd86d4560..14f28f8828 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java @@ -48,45 +48,51 @@ public class QpidClientX509KeyManager extends X509ExtendedKeyManager kmf.init(ks, keyStorePassword.toCharArray()); this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0]; } - + + @Override public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { log.debug("chooseClientAlias:Returning alias " + alias); return alias; } + @Override public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) { return delegate.chooseServerAlias(keyType, issuers, socket); } + @Override public X509Certificate[] getCertificateChain(String alias) { return delegate.getCertificateChain(alias); } + @Override public String[] getClientAliases(String keyType, Principal[] issuers) { log.debug("getClientAliases:Returning alias " + alias); return new String[]{alias}; } + @Override public PrivateKey getPrivateKey(String alias) { return delegate.getPrivateKey(alias); } + @Override public String[] getServerAliases(String keyType, Principal[] issuers) { return delegate.getServerAliases(keyType, issuers); } - + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { log.debug("chooseEngineClientAlias:Returning alias " + alias); return alias; } - + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { return delegate.chooseEngineServerAlias(keyType, issuers, engine); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java index e261860bf3..6f21c327e7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java @@ -31,6 +31,9 @@ public class URLHelper public static void parseOptions(Map<String, String> optionMap, String options) throws URLSyntaxException { + // options looks like this + // brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'' + if ((options == null) || (options.indexOf('=') == -1)) { return; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index ac8e3da3c2..516204fbd3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -143,9 +143,8 @@ public class FileUtils } /** - * Either opens the specified filename as an input stream or either the filesystem or classpath, - * or uses the default resource loaded using the specified class loader, if opening the file fails - * or no file name is specified. + * Either opens the specified filename as an input stream, or uses the default resource loaded using the + * specified class loader, if opening the file fails or no file name is specified. * * @param filename The name of the file to open. * @param defaultResource The name of the default resource on the classpath if the file cannot be opened. @@ -157,28 +156,28 @@ public class FileUtils { InputStream is = null; + // Flag to indicate whether the default resource should be used. By default this is true, so that the default + // is used when opening the file fails. + boolean useDefault = true; + // Try to open the file if one was specified. if (filename != null) { - // try on filesystem try { is = new BufferedInputStream(new FileInputStream(new File(filename))); + + // Clear the default flag because the file was succesfully opened. + useDefault = false; } catch (FileNotFoundException e) { - is = null; - } - - if (is == null) - { - // failed on filesystem, so try on classpath - is = cl.getResourceAsStream(filename); + // Ignore this exception, the default will be used instead. } } // Load the default resource if a file was not specified, or if opening the file failed. - if (is == null) + if (useDefault) { is = cl.getResourceAsStream(defaultResource); } @@ -340,7 +339,7 @@ public class FileUtils } //else we have a source directory - if (!dst.isDirectory() && !dst.mkdirs()) + if (!dst.isDirectory() && !dst.mkdir()) { throw new UnableToCopyException("Unable to create destination directory"); } |