summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main')
-rwxr-xr-xqpid/java/common/src/main/java/common.bnd2
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java467
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java227
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java351
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java)43
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java272
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java197
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java440
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java547
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java486
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java67
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java1026
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java240
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java488
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java151
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/AMQException.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java208
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java108
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java154
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java57
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java83
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java50
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java)20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java47
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java96
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java78
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java123
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java130
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java92
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java)25
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java93
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java87
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java51
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java231
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java435
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java274
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java81
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java149
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java219
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java65
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java135
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java126
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java25
63 files changed, 7346 insertions, 1152 deletions
diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd
index f12fbf9273..ef56ecec9e 100755
--- a/qpid/java/common/src/main/java/common.bnd
+++ b/qpid/java/common/src/main/java/common.bnd
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.13.0
+ver: 0.9.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
new file mode 100644
index 0000000000..0c311b6645
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
@@ -0,0 +1,467 @@
+package org.apache.mina.common;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.nio.*;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
+{
+
+
+ private static final int MINIMUM_CAPACITY = 1;
+
+ public FixedSizeByteBufferAllocator ()
+ {
+ }
+
+ public ByteBuffer allocate( int capacity, boolean direct )
+ {
+ java.nio.ByteBuffer nioBuffer;
+ if( direct )
+ {
+ nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity );
+ }
+ else
+ {
+ nioBuffer = java.nio.ByteBuffer.allocate( capacity );
+ }
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer )
+ {
+ return new FixedSizeByteBuffer( nioBuffer );
+ }
+
+ public void dispose()
+ {
+ }
+
+
+
+ private static final class FixedSizeByteBuffer extends ByteBuffer
+ {
+ private java.nio.ByteBuffer buf;
+ private int mark = -1;
+
+
+ protected FixedSizeByteBuffer( java.nio.ByteBuffer buf )
+ {
+ this.buf = buf;
+ buf.order( ByteOrder.BIG_ENDIAN );
+ }
+
+ public synchronized void acquire()
+ {
+ }
+
+ public void release()
+ {
+ }
+
+ public java.nio.ByteBuffer buf()
+ {
+ return buf;
+ }
+
+ public boolean isPooled()
+ {
+ return false;
+ }
+
+ public void setPooled( boolean pooled )
+ {
+ }
+
+ public ByteBuffer duplicate() {
+ return new FixedSizeByteBuffer( this.buf.duplicate() );
+ }
+
+ public ByteBuffer slice() {
+ return new FixedSizeByteBuffer( this.buf.slice() );
+ }
+
+ public ByteBuffer asReadOnlyBuffer() {
+ return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() );
+ }
+
+ public byte[] array()
+ {
+ return buf.array();
+ }
+
+ public int arrayOffset()
+ {
+ return buf.arrayOffset();
+ }
+
+ public boolean isDirect()
+ {
+ return buf.isDirect();
+ }
+
+ public boolean isReadOnly()
+ {
+ return buf.isReadOnly();
+ }
+
+ public int capacity()
+ {
+ return buf.capacity();
+ }
+
+ public ByteBuffer capacity( int newCapacity )
+ {
+ if( newCapacity > capacity() )
+ {
+ throw new IllegalArgumentException();
+ }
+
+ return this;
+ }
+
+
+
+ public boolean isAutoExpand()
+ {
+ return false;
+ }
+
+ public ByteBuffer setAutoExpand( boolean autoExpand )
+ {
+ if(autoExpand) throw new IllegalArgumentException();
+ else return this;
+ }
+
+ public ByteBuffer expand( int pos, int expectedRemaining )
+ {
+ int end = pos + expectedRemaining;
+ if( end > capacity() )
+ {
+ // The buffer needs expansion.
+ capacity( end );
+ }
+
+ if( end > limit() )
+ {
+ // We call limit() directly to prevent StackOverflowError
+ buf.limit( end );
+ }
+ return this;
+ }
+
+ public int position()
+ {
+ return buf.position();
+ }
+
+ public ByteBuffer position( int newPosition )
+ {
+
+ buf.position( newPosition );
+ if( mark > newPosition )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public int limit()
+ {
+ return buf.limit();
+ }
+
+ public ByteBuffer limit( int newLimit )
+ {
+ buf.limit( newLimit );
+ if( mark > newLimit )
+ {
+ mark = -1;
+ }
+ return this;
+ }
+
+ public ByteBuffer mark()
+ {
+ buf.mark();
+ mark = position();
+ return this;
+ }
+
+ public int markValue()
+ {
+ return mark;
+ }
+
+ public ByteBuffer reset()
+ {
+ buf.reset();
+ return this;
+ }
+
+ public ByteBuffer clear()
+ {
+ buf.clear();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer flip()
+ {
+ buf.flip();
+ mark = -1;
+ return this;
+ }
+
+ public ByteBuffer rewind()
+ {
+ buf.rewind();
+ mark = -1;
+ return this;
+ }
+
+ public byte get()
+ {
+ return buf.get();
+ }
+
+ public ByteBuffer put( byte b )
+ {
+ buf.put( b );
+ return this;
+ }
+
+ public byte get( int index )
+ {
+ return buf.get( index );
+ }
+
+ public ByteBuffer put( int index, byte b )
+ {
+ buf.put( index, b );
+ return this;
+ }
+
+ public ByteBuffer get( byte[] dst, int offset, int length )
+ {
+ buf.get( dst, offset, length );
+ return this;
+ }
+
+ public ByteBuffer put( java.nio.ByteBuffer src )
+ {
+ buf.put( src );
+ return this;
+ }
+
+ public ByteBuffer put( byte[] src, int offset, int length )
+ {
+ buf.put( src, offset, length );
+ return this;
+ }
+
+ public ByteBuffer compact()
+ {
+ buf.compact();
+ mark = -1;
+ return this;
+ }
+
+ public ByteOrder order()
+ {
+ return buf.order();
+ }
+
+ public ByteBuffer order( ByteOrder bo )
+ {
+ buf.order( bo );
+ return this;
+ }
+
+ public char getChar()
+ {
+ return buf.getChar();
+ }
+
+ public ByteBuffer putChar( char value )
+ {
+ buf.putChar( value );
+ return this;
+ }
+
+ public char getChar( int index )
+ {
+ return buf.getChar( index );
+ }
+
+ public ByteBuffer putChar( int index, char value )
+ {
+ buf.putChar( index, value );
+ return this;
+ }
+
+ public CharBuffer asCharBuffer()
+ {
+ return buf.asCharBuffer();
+ }
+
+ public short getShort()
+ {
+ return buf.getShort();
+ }
+
+ public ByteBuffer putShort( short value )
+ {
+ buf.putShort( value );
+ return this;
+ }
+
+ public short getShort( int index )
+ {
+ return buf.getShort( index );
+ }
+
+ public ByteBuffer putShort( int index, short value )
+ {
+ buf.putShort( index, value );
+ return this;
+ }
+
+ public ShortBuffer asShortBuffer()
+ {
+ return buf.asShortBuffer();
+ }
+
+ public int getInt()
+ {
+ return buf.getInt();
+ }
+
+ public ByteBuffer putInt( int value )
+ {
+ buf.putInt( value );
+ return this;
+ }
+
+ public int getInt( int index )
+ {
+ return buf.getInt( index );
+ }
+
+ public ByteBuffer putInt( int index, int value )
+ {
+ buf.putInt( index, value );
+ return this;
+ }
+
+ public IntBuffer asIntBuffer()
+ {
+ return buf.asIntBuffer();
+ }
+
+ public long getLong()
+ {
+ return buf.getLong();
+ }
+
+ public ByteBuffer putLong( long value )
+ {
+ buf.putLong( value );
+ return this;
+ }
+
+ public long getLong( int index )
+ {
+ return buf.getLong( index );
+ }
+
+ public ByteBuffer putLong( int index, long value )
+ {
+ buf.putLong( index, value );
+ return this;
+ }
+
+ public LongBuffer asLongBuffer()
+ {
+ return buf.asLongBuffer();
+ }
+
+ public float getFloat()
+ {
+ return buf.getFloat();
+ }
+
+ public ByteBuffer putFloat( float value )
+ {
+ buf.putFloat( value );
+ return this;
+ }
+
+ public float getFloat( int index )
+ {
+ return buf.getFloat( index );
+ }
+
+ public ByteBuffer putFloat( int index, float value )
+ {
+ buf.putFloat( index, value );
+ return this;
+ }
+
+ public FloatBuffer asFloatBuffer()
+ {
+ return buf.asFloatBuffer();
+ }
+
+ public double getDouble()
+ {
+ return buf.getDouble();
+ }
+
+ public ByteBuffer putDouble( double value )
+ {
+ buf.putDouble( value );
+ return this;
+ }
+
+ public double getDouble( int index )
+ {
+ return buf.getDouble( index );
+ }
+
+ public ByteBuffer putDouble( int index, double value )
+ {
+ buf.putDouble( index, value );
+ return this;
+ }
+
+ public DoubleBuffer asDoubleBuffer()
+ {
+ return buf.asDoubleBuffer();
+ }
+
+
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
new file mode 100644
index 0000000000..4fd28c4eb5
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common.support;
+
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFutureListener;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * A default implementation of {@link org.apache.mina.common.IoFuture}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ */
+public class DefaultIoFuture implements IoFuture
+{
+ private final IoSession session;
+ private final Object lock;
+ private List listeners;
+ private Object result;
+ private boolean ready;
+
+
+ /**
+ * Creates a new instance.
+ *
+ * @param session an {@link IoSession} which is associated with this future
+ */
+ public DefaultIoFuture( IoSession session )
+ {
+ this.session = session;
+ this.lock = this;
+ }
+
+ /**
+ * Creates a new instance which uses the specified object as a lock.
+ */
+ public DefaultIoFuture( IoSession session, Object lock )
+ {
+ if( lock == null )
+ {
+ throw new NullPointerException( "lock" );
+ }
+ this.session = session;
+ this.lock = lock;
+ }
+
+ public IoSession getSession()
+ {
+ return session;
+ }
+
+ public Object getLock()
+ {
+ return lock;
+ }
+
+ public void join()
+ {
+ synchronized( lock )
+ {
+ while( !ready )
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+ }
+
+ public boolean join( long timeoutInMillis )
+ {
+ long startTime = ( timeoutInMillis <= 0 ) ? 0 : System
+ .currentTimeMillis();
+ long waitTime = timeoutInMillis;
+
+ synchronized( lock )
+ {
+ if( ready )
+ {
+ return ready;
+ }
+ else if( waitTime <= 0 )
+ {
+ return ready;
+ }
+
+ for( ;; )
+ {
+ try
+ {
+ lock.wait( waitTime );
+ }
+ catch( InterruptedException e )
+ {
+ }
+
+ if( ready )
+ return true;
+ else
+ {
+ waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime );
+ if( waitTime <= 0 )
+ {
+ return ready;
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isReady()
+ {
+ synchronized( lock )
+ {
+ return ready;
+ }
+ }
+
+ /**
+ * Sets the result of the asynchronous operation, and mark it as finished.
+ */
+ protected void setValue( Object newValue )
+ {
+ synchronized( lock )
+ {
+ // Allow only once.
+ if( ready )
+ {
+ return;
+ }
+
+ result = newValue;
+ ready = true;
+ lock.notifyAll();
+
+ notifyListeners();
+ }
+ }
+
+ /**
+ * Returns the result of the asynchronous operation.
+ */
+ protected Object getValue()
+ {
+ synchronized( lock )
+ {
+ return result;
+ }
+ }
+
+ public void addListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ if(listeners == null)
+ {
+ listeners = new ArrayList();
+ }
+ listeners.add( listener );
+ if( ready )
+ {
+ listener.operationComplete( this );
+ }
+ }
+ }
+
+ public void removeListener( IoFutureListener listener )
+ {
+ if( listener == null )
+ {
+ throw new NullPointerException( "listener" );
+ }
+
+ synchronized( lock )
+ {
+ listeners.remove( listener );
+ }
+ }
+
+ private void notifyListeners()
+ {
+ synchronized( lock )
+ {
+
+ if(listeners != null)
+ {
+
+ for( Iterator i = listeners.iterator(); i.hasNext(); ) {
+ ( ( IoFutureListener ) i.next() ).operationComplete( this );
+ }
+ }
+ }
+ }
+}
+
+
+
diff --git a/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java b/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
new file mode 100644
index 0000000000..5723ffbaa9
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common.support;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.IdentityHashSet;
+
+/**
+ * A helper which provides addition and removal of {@link IoServiceListener}s and firing
+ * events.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 446526 $, $Date: 2006-09-15 01:44:11 -0400 (Fri, 15 Sep 2006) $
+ */
+public class IoServiceListenerSupport
+{
+ /**
+ * A list of {@link IoServiceListener}s.
+ */
+ private final List listeners = new ArrayList();
+
+ /**
+ * Tracks managed <tt>serviceAddress</tt>es.
+ */
+ private final Set managedServiceAddresses = new HashSet();
+
+ /**
+ * Tracks managed sesssions with <tt>serviceAddress</tt> as a key.
+ */
+ private final Map managedSessions = new HashMap();
+
+ /**
+ * Creates a new instance.
+ */
+ public IoServiceListenerSupport()
+ {
+ }
+
+ /**
+ * Adds a new listener.
+ */
+ public void add( IoServiceListener listener )
+ {
+ synchronized( listeners )
+ {
+ listeners.add( listener );
+ }
+ }
+
+ /**
+ * Removes an existing listener.
+ */
+ public void remove( IoServiceListener listener )
+ {
+ synchronized( listeners )
+ {
+ listeners.remove( listener );
+ }
+ }
+
+ public Set getManagedServiceAddresses()
+ {
+ return Collections.unmodifiableSet( managedServiceAddresses );
+ }
+
+ public boolean isManaged( SocketAddress serviceAddress )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ return managedServiceAddresses.contains( serviceAddress );
+ }
+ }
+
+ public Set getManagedSessions( SocketAddress serviceAddress )
+ {
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ if( sessions == null )
+ {
+ sessions = new IdentityHashSet();
+ }
+ }
+
+ synchronized( sessions )
+ {
+ return new IdentityHashSet( sessions );
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#serviceActivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
+ * for all registered listeners.
+ */
+ public void fireServiceActivated(
+ IoService service, SocketAddress serviceAddress,
+ IoHandler handler, IoServiceConfig config )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ if( !managedServiceAddresses.add( serviceAddress ) )
+ {
+ return;
+ }
+ }
+
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).serviceActivated(
+ service, serviceAddress, handler, config );
+ }
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#serviceDeactivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
+ * for all registered listeners.
+ */
+ public synchronized void fireServiceDeactivated(
+ IoService service, SocketAddress serviceAddress,
+ IoHandler handler, IoServiceConfig config )
+ {
+ synchronized( managedServiceAddresses )
+ {
+ if( !managedServiceAddresses.remove( serviceAddress ) )
+ {
+ return;
+ }
+ }
+
+ try
+ {
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).serviceDeactivated(
+ service, serviceAddress, handler, config );
+ }
+ }
+ }
+ finally
+ {
+ disconnectSessions( serviceAddress, config );
+ }
+ }
+
+
+ /**
+ * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
+ */
+ public void fireSessionCreated( IoSession session )
+ {
+ SocketAddress serviceAddress = session.getServiceAddress();
+
+ // Get the session set.
+ boolean firstSession = false;
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ if( sessions == null )
+ {
+ sessions = new IdentityHashSet();
+ managedSessions.put( serviceAddress, sessions );
+ firstSession = true;
+ }
+ }
+
+ // If already registered, ignore.
+ synchronized( sessions )
+ {
+ if ( !sessions.add( session ) )
+ {
+ return;
+ }
+ }
+
+ // If the first connector session, fire a virtual service activation event.
+ if( session.getService() instanceof IoConnector && firstSession )
+ {
+ fireServiceActivated(
+ session.getService(), session.getServiceAddress(),
+ session.getHandler(), session.getServiceConfig() );
+ }
+
+ // Fire session events.
+ session.getFilterChain().fireSessionCreated( session );
+ session.getFilterChain().fireSessionOpened( session);
+
+ // Fire listener events.
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).sessionCreated( session );
+ }
+ }
+ }
+
+ /**
+ * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
+ */
+ public void fireSessionDestroyed( IoSession session )
+ {
+ SocketAddress serviceAddress = session.getServiceAddress();
+
+ // Get the session set.
+ Set sessions;
+ boolean lastSession = false;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ // Ignore if unknown.
+ if( sessions == null )
+ {
+ return;
+ }
+
+ // Try to remove the remaining empty seession set after removal.
+ synchronized( sessions )
+ {
+ sessions.remove( session );
+ if( sessions.isEmpty() )
+ {
+ managedSessions.remove( serviceAddress );
+ lastSession = true;
+ }
+ }
+ }
+
+ // Fire session events.
+ session.getFilterChain().fireSessionClosed( session );
+
+ // Fire listener events.
+ try
+ {
+ synchronized( listeners )
+ {
+ for( Iterator i = listeners.iterator(); i.hasNext(); )
+ {
+ ( ( IoServiceListener ) i.next() ).sessionDestroyed( session );
+ }
+ }
+ }
+ finally
+ {
+ // Fire a virtual service deactivation event for the last session of the connector.
+ //TODO double-check that this is *STILL* the last session. May not be the case
+ if( session.getService() instanceof IoConnector && lastSession )
+ {
+ fireServiceDeactivated(
+ session.getService(), session.getServiceAddress(),
+ session.getHandler(), session.getServiceConfig() );
+ }
+ }
+ }
+
+ private void disconnectSessions( SocketAddress serviceAddress, IoServiceConfig config )
+ {
+ if( !( config instanceof IoAcceptorConfig ) )
+ {
+ return;
+ }
+
+ if( !( ( IoAcceptorConfig ) config ).isDisconnectOnUnbind() )
+ {
+ return;
+ }
+
+ Set sessions;
+ synchronized( managedSessions )
+ {
+ sessions = ( Set ) managedSessions.get( serviceAddress );
+ }
+
+ if( sessions == null )
+ {
+ return;
+ }
+
+ Set sessionsCopy;
+
+ // Create a copy to avoid ConcurrentModificationException
+ synchronized( sessions )
+ {
+ sessionsCopy = new IdentityHashSet( sessions );
+ }
+
+ final CountDownLatch latch = new CountDownLatch(sessionsCopy.size());
+
+ for( Iterator i = sessionsCopy.iterator(); i.hasNext(); )
+ {
+ ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
+ {
+ public void operationComplete( IoFuture future )
+ {
+ latch.countDown();
+ }
+ } );
+ }
+
+ try
+ {
+ latch.await();
+ }
+ catch( InterruptedException ie )
+ {
+ // Ignored
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
index 1f69973b96..47f19aa76d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
@@ -1,4 +1,6 @@
-/*
+package org.apache.mina.filter;
+
+import org.apache.mina.common.IoFilter;/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,30 +20,29 @@
* under the License.
*
*/
-package org.apache.qpid.transport.network;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.Sender;
-public interface NetworkConnection
+public class WriteBufferFullExeception extends RuntimeException
{
- Sender<ByteBuffer> getSender();
+ private IoFilter.WriteRequest _writeRequest;
- void close();
+ public WriteBufferFullExeception()
+ {
+ this(null);
+ }
- /**
- * Returns the remote address of the underlying socket.
- */
- SocketAddress getRemoteAddress();
+ public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
- /**
- * Returns the local address of the underlying socket.
- */
- SocketAddress getLocalAddress();
- void setMaxWriteIdle(int sec);
+ public void setWriteRequest(IoFilter.WriteRequest writeRequest)
+ {
+ _writeRequest = writeRequest;
+ }
- void setMaxReadIdle(int sec);
-} \ No newline at end of file
+ public IoFilter.WriteRequest getWriteRequest()
+ {
+ return _writeRequest;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
new file mode 100644
index 0000000000..4e9db9071a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than
+ * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the
+ * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
+ * cause an Out of Memory exception due to a back log of unprocessed messages.
+ *
+ * This is should only be viewed as a temporary work around for DIRMINA-302.
+ *
+ * A true solution should not be implemented as a filter as this issue will always occur. On a machine
+ * where the network is slower than the local producer.
+ *
+ * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
+ *
+ * They could be:
+ * Block - As this does
+ * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
+ * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state
+ *
+ * <p/>
+ * <p>Usage:
+ * <p/>
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( builder );
+ * </code></pre>
+ * <p/>
+ * or
+ * <p/>
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class WriteBufferLimitFilterBuilder
+{
+ public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize";
+
+ private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000;
+
+ private volatile boolean throwNotBlock = false;
+
+ private volatile int maximumConnectionBufferCount;
+ private volatile long maximumConnectionBufferSize;
+
+ private final Object _blockLock = new Object();
+
+ private int _blockWaiters = 0;
+
+
+ public WriteBufferLimitFilterBuilder()
+ {
+ this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT);
+ }
+
+ public WriteBufferLimitFilterBuilder(int maxWriteBufferSize)
+ {
+ setMaximumConnectionBufferCount(maxWriteBufferSize);
+ }
+
+
+ /**
+ * Set the maximum amount pending items in the writeQueue for a given session.
+ * Changing the value will only take effect when new data is received for a
+ * connection, including existing connections. Default value is 5000 msgs.
+ *
+ * @param maximumConnectionBufferCount New buffer size. Must be > 0
+ */
+ public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount)
+ {
+ this.maximumConnectionBufferCount = maximumConnectionBufferCount;
+ this.maximumConnectionBufferSize = 0;
+ }
+
+ public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize)
+ {
+ this.maximumConnectionBufferSize = maximumConnectionBufferSize;
+ this.maximumConnectionBufferCount = 0;
+ }
+
+ /**
+ * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself
+ * before and after that filter.
+ *
+ * @param chain {@link IoFilterChain} to attach self to.
+ */
+ public void attach(IoFilterChain chain)
+ {
+ String name = getThreadPoolFilterEntryName(chain.getAll());
+
+ chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ /**
+ * Attach this filter to the specified builder. It will search for the
+ * {@link ExecutorFilter}, and attach itself before and after that filter.
+ *
+ * @param builder {@link DefaultIoFilterChainBuilder} to attach self to.
+ */
+ public void attach(DefaultIoFilterChainBuilder builder)
+ {
+ String name = getThreadPoolFilterEntryName(builder.getAll());
+
+ builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
+ }
+
+ private String getThreadPoolFilterEntryName(List entries)
+ {
+ Iterator i = entries.iterator();
+
+ while (i.hasNext())
+ {
+ IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next();
+
+ if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class))
+ {
+ return entry.getName();
+ }
+ }
+
+ throw new IllegalStateException("Chain does not contain a ExecutorFilter");
+ }
+
+
+ public class SendLimit extends IoFilterAdapter
+ {
+ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
+ {
+ try
+ {
+ waitTillSendAllowed(session);
+ }
+ catch (WriteBufferFullExeception wbfe)
+ {
+ nextFilter.exceptionCaught(session, wbfe);
+ }
+
+ if (writeRequest.getMessage() instanceof ByteBuffer)
+ {
+ increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage());
+ }
+
+ nextFilter.filterWrite(session, writeRequest);
+ }
+
+ private void increasePendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ Long pendingSize = getScheduledWriteBytes(session) + message.remaining();
+ session.setAttribute(PENDING_SIZE, pendingSize);
+ }
+ }
+
+ private boolean sendAllowed(IoSession session)
+ {
+ if (session.isClosing())
+ {
+ return true;
+ }
+
+ int lmswm = maximumConnectionBufferCount;
+ long lmswb = maximumConnectionBufferSize;
+
+ return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm)
+ && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb);
+ }
+
+ private long getScheduledWriteBytes(IoSession session)
+ {
+ synchronized (session)
+ {
+ Long i = (Long) session.getAttribute(PENDING_SIZE);
+ return null == i ? 0 : i;
+ }
+ }
+
+ private void waitTillSendAllowed(IoSession session)
+ {
+ synchronized (_blockLock)
+ {
+ if (throwNotBlock)
+ {
+ throw new WriteBufferFullExeception();
+ }
+
+ _blockWaiters++;
+
+ while (!sendAllowed(session))
+ {
+ try
+ {
+ _blockLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore.
+ }
+ }
+ _blockWaiters--;
+ }
+ }
+
+ public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
+ {
+ if (message instanceof ByteBuffer)
+ {
+ decrementPendingWriteSize(session, (ByteBuffer) message);
+ }
+ notifyWaitingWriters();
+ nextFilter.messageSent(session, message);
+ }
+
+ private void decrementPendingWriteSize(IoSession session, ByteBuffer message)
+ {
+ synchronized (session)
+ {
+ session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining());
+ }
+ }
+
+ private void notifyWaitingWriters()
+ {
+ synchronized (_blockLock)
+ {
+ if (_blockWaiters != 0)
+ {
+ _blockLock.notifyAll();
+ }
+ }
+
+ }
+
+ }//SentLimit
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
new file mode 100644
index 0000000000..3f7e206cb4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link ProtocolDecoder} that cumulates the content of received
+ * buffers to a <em>cumulative buffer</em> to help users implement decoders.
+ * <p>
+ * If the received {@link ByteBuffer} is only a part of a message.
+ * decoders should cumulate received buffers to make a message complete or
+ * to postpone decoding until more buffers arrive.
+ * <p>
+ * Here is an example decoder that decodes CRLF terminated lines into
+ * <code>Command</code> objects:
+ * <pre>
+ * public class CRLFTerminatedCommandLineDecoder
+ * extends CumulativeProtocolDecoder {
+ *
+ * private Command parseCommand(ByteBuffer in) {
+ * // Convert the bytes in the specified buffer to a
+ * // Command object.
+ * ...
+ * }
+ *
+ * protected boolean doDecode(IoSession session, ByteBuffer in,
+ * ProtocolDecoderOutput out)
+ * throws Exception {
+ *
+ * // Remember the initial position.
+ * int start = in.position();
+ *
+ * // Now find the first CRLF in the buffer.
+ * byte previous = 0;
+ * while (in.hasRemaining()) {
+ * byte current = in.get();
+ *
+ * if (previous == '\r' && current == '\n') {
+ * // Remember the current position and limit.
+ * int position = in.position();
+ * int limit = in.limit();
+ * try {
+ * in.position(start);
+ * in.limit(position);
+ * // The bytes between in.position() and in.limit()
+ * // now contain a full CRLF terminated line.
+ * out.write(parseCommand(in.slice()));
+ * } finally {
+ * // Set the position to point right after the
+ * // detected line and set the limit to the old
+ * // one.
+ * in.position(position);
+ * in.limit(limit);
+ * }
+ * // Decoded one line; CumulativeProtocolDecoder will
+ * // call me again until I return false. So just
+ * // return true until there are no more lines in the
+ * // buffer.
+ * return true;
+ * }
+ *
+ * previous = current;
+ * }
+ *
+ * // Could not find CRLF in the buffer. Reset the initial
+ * // position to the one we recorded above.
+ * in.position(start);
+ *
+ * return false;
+ * }
+ * }
+ * </pre>
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
+
+ private static final String BUFFER = OurCumulativeProtocolDecoder.class
+ .getName()
+ + ".Buffer";
+
+ /**
+ * Creates a new instance.
+ */
+ protected OurCumulativeProtocolDecoder() {
+ }
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is NOT compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ boolean usingSessionBuffer = true;
+ ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
+ // If we have a session buffer, append data to that; otherwise
+ // use the buffer read from the network directly.
+ if (buf != null) {
+ buf.put(in);
+ buf.flip();
+ } else {
+ buf = in;
+ usingSessionBuffer = false;
+ }
+
+ for (;;) {
+ int oldPos = buf.position();
+ boolean decoded = doDecode(session, buf, out);
+ if (decoded) {
+ if (buf.position() == oldPos) {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if (!buf.hasRemaining()) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if (buf.hasRemaining()) {
+ storeRemainingInSession(buf, session);
+ } else {
+ if (usingSessionBuffer)
+ removeSessionBuffer(session);
+ }
+ }
+
+ /**
+ * Implement this method to consume the specified cumulative buffer and
+ * decode its content into message(s).
+ *
+ * @param in the cumulative buffer
+ * @return <tt>true</tt> if and only if there's more to decode in the buffer
+ * and you want to have <tt>doDecode</tt> method invoked again.
+ * Return <tt>false</tt> if remaining data is not enough to decode,
+ * then this method will be invoked again when more data is cumulated.
+ * @throws Exception if cannot decode <tt>in</tt>.
+ */
+ protected abstract boolean doDecode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception;
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose(IoSession session) throws Exception {
+ removeSessionBuffer(session);
+ }
+
+ private void removeSessionBuffer(IoSession session) {
+ ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
+ if (buf != null) {
+ buf.release();
+ }
+ }
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
+ ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
+ remainingBuf.setAutoExpand(true);
+ remainingBuf.order(buf.order());
+ remainingBuf.put(buf);
+ session.setAttribute(BUFFER, remainingBuf);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
new file mode 100644
index 0000000000..b8c6f29720
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
@@ -0,0 +1,440 @@
+package org.apache.mina.filter.codec;
+
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import org.apache.mina.common.*;
+import org.apache.mina.common.support.DefaultWriteFuture;
+import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
+import org.apache.mina.util.SessionLog;
+import org.apache.mina.util.Queue;
+
+
+public class QpidProtocolCodecFilter extends IoFilterAdapter
+{
+ public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
+ public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
+
+ private static final Class[] EMPTY_PARAMS = new Class[0];
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
+
+ private final ProtocolCodecFactory factory;
+
+ public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
+ {
+ if( factory == null )
+ {
+ throw new NullPointerException( "factory" );
+ }
+ this.factory = factory;
+ }
+
+ public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
+ {
+ if( encoder == null )
+ {
+ throw new NullPointerException( "encoder" );
+ }
+ if( decoder == null )
+ {
+ throw new NullPointerException( "decoder" );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder()
+ {
+ return encoder;
+ }
+
+ public ProtocolDecoder getDecoder()
+ {
+ return decoder;
+ }
+ };
+ }
+
+ public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
+ {
+ if( encoderClass == null )
+ {
+ throw new NullPointerException( "encoderClass" );
+ }
+ if( decoderClass == null )
+ {
+ throw new NullPointerException( "decoderClass" );
+ }
+ if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
+ {
+ throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
+ }
+ if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
+ {
+ throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
+ }
+ try
+ {
+ encoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
+ }
+ try
+ {
+ decoderClass.getConstructor( EMPTY_PARAMS );
+ }
+ catch( NoSuchMethodException e )
+ {
+ throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
+ }
+
+ this.factory = new ProtocolCodecFactory()
+ {
+ public ProtocolEncoder getEncoder() throws Exception
+ {
+ return ( ProtocolEncoder ) encoderClass.newInstance();
+ }
+
+ public ProtocolDecoder getDecoder() throws Exception
+ {
+ return ( ProtocolDecoder ) decoderClass.newInstance();
+ }
+ };
+ }
+
+ public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
+ {
+ if( parent.contains( ProtocolCodecFilter.class ) )
+ {
+ throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
+ }
+ }
+
+ public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( !( message instanceof ByteBuffer ) )
+ {
+ nextFilter.messageReceived( session, message );
+ return;
+ }
+
+ ByteBuffer in = ( ByteBuffer ) message;
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+
+ try
+ {
+ decoder.decode( session, in, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ pde.setHexdump( in.getHexDump() );
+ throw pde;
+ }
+ finally
+ {
+ // Dispose the decoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeDecoder( session );
+ }
+
+ // Release the read buffer.
+ in.release();
+
+ decoderOut.flush();
+ }
+ }
+
+ public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
+ {
+ if( message instanceof HiddenByteBuffer )
+ {
+ return;
+ }
+
+ if( !( message instanceof MessageByteBuffer ) )
+ {
+ nextFilter.messageSent( session, message );
+ return;
+ }
+
+ nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
+ }
+
+ public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
+ {
+ Object message = writeRequest.getMessage();
+ if( message instanceof ByteBuffer )
+ {
+ nextFilter.filterWrite( session, writeRequest );
+ return;
+ }
+
+ ProtocolEncoder encoder = getEncoder( session );
+ ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
+
+ try
+ {
+ encoder.encode( session, message, encoderOut );
+ encoderOut.flush();
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ new MessageByteBuffer( writeRequest.getMessage() ),
+ writeRequest.getFuture(), writeRequest.getDestination() ) );
+ }
+ catch( Throwable t )
+ {
+ ProtocolEncoderException pee;
+ if( t instanceof ProtocolEncoderException )
+ {
+ pee = ( ProtocolEncoderException ) t;
+ }
+ else
+ {
+ pee = new ProtocolEncoderException( t );
+ }
+ throw pee;
+ }
+ finally
+ {
+ // Dispose the encoder if this session is connectionless.
+ if( session.getTransportType().isConnectionless() )
+ {
+ disposeEncoder( session );
+ }
+ }
+ }
+
+ public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ // Call finishDecode() first when a connection is closed.
+ ProtocolDecoder decoder = getDecoder( session );
+ ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
+ try
+ {
+ decoder.finishDecode( session, decoderOut );
+ }
+ catch( Throwable t )
+ {
+ ProtocolDecoderException pde;
+ if( t instanceof ProtocolDecoderException )
+ {
+ pde = ( ProtocolDecoderException ) t;
+ }
+ else
+ {
+ pde = new ProtocolDecoderException( t );
+ }
+ throw pde;
+ }
+ finally
+ {
+ // Dispose all.
+ disposeEncoder( session );
+ disposeDecoder( session );
+
+ decoderOut.flush();
+ }
+
+ nextFilter.sessionClosed( session );
+ }
+
+ private ProtocolEncoder getEncoder( IoSession session ) throws Exception
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
+ if( encoder == null )
+ {
+ encoder = factory.getEncoder();
+ session.setAttribute( ENCODER, encoder );
+ }
+ return encoder;
+ }
+
+ private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
+ }
+
+ private ProtocolDecoder getDecoder( IoSession session ) throws Exception
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
+ if( decoder == null )
+ {
+ decoder = factory.getDecoder();
+ session.setAttribute( DECODER, decoder );
+ }
+ return decoder;
+ }
+
+ private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
+ {
+ return new SimpleProtocolDecoderOutput( session, nextFilter );
+ }
+
+ private void disposeEncoder( IoSession session )
+ {
+ ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
+ if( encoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ encoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Failed to dispose: " + encoder.getClass().getName() +
+ " (" + encoder + ')' );
+ }
+ }
+
+ private void disposeDecoder( IoSession session )
+ {
+ ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
+ if( decoder == null )
+ {
+ return;
+ }
+
+ try
+ {
+ decoder.dispose( session );
+ }
+ catch( Throwable t )
+ {
+ SessionLog.warn(
+ session,
+ "Falied to dispose: " + decoder.getClass().getName() +
+ " (" + decoder + ')' );
+ }
+ }
+
+ private static class HiddenByteBuffer extends ByteBufferProxy
+ {
+ private HiddenByteBuffer( ByteBuffer buf )
+ {
+ super( buf );
+ }
+ }
+
+ private static class MessageByteBuffer extends ByteBufferProxy
+ {
+ private final Object message;
+
+ private MessageByteBuffer( Object message )
+ {
+ super( EMPTY_BUFFER );
+ this.message = message;
+ }
+
+ public void acquire()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+
+ public void release()
+ {
+ // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
+ }
+ }
+
+ private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
+ {
+ private ByteBuffer buffer;
+
+ private final IoSession session;
+ private final IoFilter.NextFilter nextFilter;
+ private final IoFilter.WriteRequest writeRequest;
+
+ public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
+ {
+ this.session = session;
+ this.nextFilter = nextFilter;
+ this.writeRequest = writeRequest;
+ }
+
+
+
+ public void write( ByteBuffer buf )
+ {
+ if(buffer != null)
+ {
+ flush();
+ }
+ buffer = buf;
+ }
+
+ public void mergeAll()
+ {
+ }
+
+ public WriteFuture flush()
+ {
+ WriteFuture future = null;
+ if( buffer == null )
+ {
+ return null;
+ }
+ else
+ {
+ ByteBuffer buf = buffer;
+ // Flush only when the buffer has remaining.
+ if( buf.hasRemaining() )
+ {
+ future = doFlush( buf );
+ }
+
+ }
+
+ return future;
+ }
+
+
+ protected WriteFuture doFlush( ByteBuffer buf )
+ {
+ WriteFuture future = new DefaultWriteFuture( session );
+ nextFilter.filterWrite(
+ session,
+ new IoFilter.WriteRequest(
+ buf,
+ future, writeRequest.getDestination() ) );
+ return future;
+ }
+ }
+}
+
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
new file mode 100644
index 0000000000..e5360d32e0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.util.Queue;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.NamePreservingRunnable;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class MultiThreadSocketAcceptor extends SocketAcceptor
+{
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static volatile int nextId = 0;
+
+ private final Executor executor;
+ private final Object lock = new Object();
+ private final int id = nextId ++;
+ private final String threadName = "SocketAcceptor-" + id;
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+
+ /**
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+ */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+
+ /**
+ * Create an acceptor with a single processing thread using a NewThreadExecutor
+ */
+ public MultiThreadSocketAcceptor()
+ {
+ this( 1, new NewThreadExecutor() );
+ }
+
+ /**
+ * Create an acceptor with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketAcceptor( int processorCount, Executor executor )
+ {
+ if( processorCount < 1 )
+ {
+ throw new IllegalArgumentException( "Must have at least one processor" );
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for( int i = 0; i < processorCount; i++ )
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor );
+ }
+ }
+
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming connections with the specified
+ * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
+ {
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( address != null && !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, handler, config );
+
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker() throws IOException
+ {
+ synchronized( lock )
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+
+ executor.execute( new NamePreservingRunnable( worker ) );
+ }
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ CancellationRequest request = new CancellationRequest( address );
+
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+ }
+
+
+ private class Worker implements Runnable
+ {
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
+
+ for( ; ; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( lock )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ try
+ {
+
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+
+ MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
+ MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
+ req.config, ch, req.handler, req.address );
+
+ // New Interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// SocketAcceptor.this, nextProcessor(), getListeners(),
+// req.config, ch, req.handler, req.address );
+
+
+ getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
+ session.getIoProcessor().addNew( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+
+ // Configure the server socket,
+ SocketAcceptorConfig cfg;
+ if( req.config instanceof SocketAcceptorConfig )
+ {
+ cfg = ( SocketAcceptorConfig ) req.config;
+ }
+ else
+ {
+ cfg = ( SocketAcceptorConfig ) getDefaultConfig();
+ }
+
+ ssc.socket().setReuseAddress( cfg.isReuseAddress() );
+ ssc.socket().setReceiveBufferSize(
+ ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
+
+ // and bind.
+ ssc.socket().bind( req.address, cfg.getBacklog() );
+ if( req.address == null || req.address.getPort() == 0 )
+ {
+ req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress();
+ }
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ synchronized( channels )
+ {
+ channels.put( req.address, ssc );
+ }
+
+ getListeners().fireServiceActivated(
+ this, req.address, req.handler, req.config );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notifyAll();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc;
+ synchronized( channels )
+ {
+ ssc = ( ServerSocketChannel ) channels.remove( request.address );
+ }
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+ request.registrationRequest = ( RegistrationRequest ) key.attachment();
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+ request.notifyAll();
+ }
+
+ if( request.exception == null )
+ {
+ getListeners().fireServiceDeactivated(
+ this, request.address,
+ request.registrationRequest.handler,
+ request.registrationRequest.config );
+ }
+ }
+ }
+ }
+
+ private static class RegistrationRequest
+ {
+ private InetSocketAddress address;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+ private IOException exception;
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ this.address = ( InetSocketAddress ) address;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+ private boolean done;
+ private RegistrationRequest registrationRequest;
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
new file mode 100644
index 0000000000..7344f70078
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class MultiThreadSocketConnector extends SocketConnector
+{
+ /** @noinspection StaticNonFinalField */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+
+ private final Queue connectQueue = new Queue();
+ private final MultiThreadSocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+ private final Executor executor;
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+ private int workerTimeout = 60; // 1 min.
+
+ /** Create a connector with a single processing thread using a NewThreadExecutor */
+ public MultiThreadSocketConnector()
+ {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public MultiThreadSocketConnector(int processorCount, Executor executor)
+ {
+ if (processorCount < 1)
+ {
+ throw new IllegalArgumentException("Must have at least one processor");
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++)
+ {
+ ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+ }
+ }
+
+ /**
+ * How many seconds to keep the connection thread alive between connection requests
+ *
+ * @return Number of seconds to keep connection thread alive
+ */
+ public int getWorkerTimeout()
+ {
+ return workerTimeout;
+ }
+
+ /**
+ * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+ *
+ * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+ */
+ public void setWorkerTimeout(int workerTimeout)
+ {
+ if (workerTimeout < 0)
+ {
+ throw new IllegalArgumentException("Must be >= 0");
+ }
+ this.workerTimeout = workerTimeout;
+ }
+
+ public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+ {
+ return connect(address, null, handler, config);
+ }
+
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config)
+ {
+ if (address == null)
+ {
+ throw new NullPointerException("address");
+ }
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+
+ if (!(address instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected address type: "
+ + address.getClass());
+ }
+
+ if (localAddress != null && !(localAddress instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected local address type: "
+ + localAddress.getClass());
+ }
+
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+
+ SocketChannel ch = null;
+ boolean success = false;
+ try
+ {
+ ch = SocketChannel.open();
+ ch.socket().setReuseAddress(true);
+ if (localAddress != null)
+ {
+ ch.socket().bind(localAddress);
+ }
+
+ ch.configureBlocking(false);
+
+ if (ch.connect(address))
+ {
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ newSession(ch, handler, config, future);
+ success = true;
+ return future;
+ }
+
+ success = true;
+ }
+ catch (IOException e)
+ {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ finally
+ {
+ if (!success && ch != null)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest(ch, handler, config);
+ synchronized (lock)
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e2)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e2);
+ }
+
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ }
+
+ synchronized (connectQueue)
+ {
+ connectQueue.push(request);
+ }
+ selector.wakeup();
+
+ return request;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if (worker == null)
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ private void registerNew()
+ {
+ if (connectQueue.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ ConnectionRequest req;
+ synchronized (connectQueue)
+ {
+ req = (ConnectionRequest) connectQueue.pop();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register(selector, SelectionKey.OP_CONNECT, req);
+ }
+ catch (IOException e)
+ {
+ req.setException(e);
+ }
+ }
+ }
+
+ private void processSessions(Set keys)
+ {
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isConnectable())
+ {
+ continue;
+ }
+
+ SocketChannel ch = (SocketChannel) key.channel();
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ newSession(ch, entry.handler, entry.config, entry);
+ success = true;
+ }
+ catch (Throwable e)
+ {
+ entry.setException(e);
+ }
+ finally
+ {
+ key.cancel();
+ if (!success)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions(Set keys)
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ if (currentTime >= entry.deadline)
+ {
+ entry.setException(new ConnectException());
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ key.cancel();
+ }
+ }
+ }
+ }
+
+ private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ MultiThreadSocketSessionImpl session =
+ new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(),
+ config, ch, handler, ch.socket().getRemoteSocketAddress());
+
+ //new interface
+// SocketSessionImpl session = new SocketSessionImpl(
+// this, nextProcessor(), getListeners(),
+// config, ch, handler, ch.socket().getRemoteSocketAddress() );
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+
+ // Set the ConnectFuture of the specified session, which will be
+ // removed and notified by AbstractIoFilterChain eventually.
+ session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
+
+ // Forward the remaining process to the SocketIoProcessor.
+ session.getIoProcessor().addNew(session);
+ }
+
+ private MultiThreadSocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ private class Worker implements Runnable
+ {
+ private long lastActive = System.currentTimeMillis();
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName);
+
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(1000);
+
+ registerNew();
+
+ if (nKeys > 0)
+ {
+ processSessions(selector.selectedKeys());
+ }
+
+ processTimedOutSessions(selector.keys());
+
+ if (selector.keys().isEmpty())
+ {
+ if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+ {
+ synchronized (lock)
+ {
+ if (selector.keys().isEmpty() &&
+ connectQueue.isEmpty())
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ lastActive = System.currentTimeMillis();
+ }
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ }
+ }
+
+ private class ConnectionRequest extends DefaultConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+
+ private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+ {
+ this.channel = channel;
+ long timeout;
+ if (config instanceof IoConnectorConfig)
+ {
+ timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+ }
+ else
+ {
+ timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+ }
+ this.deadline = System.currentTimeMillis() + timeout;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
new file mode 100644
index 0000000000..67b8c8d820
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ */
+class MultiThreadSocketFilterChain extends AbstractIoFilterChain {
+
+ MultiThreadSocketFilterChain( IoSession parent )
+ {
+ super( parent );
+ }
+
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ Queue writeRequestQueue = s.getWriteRequestQueue();
+
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.push( writeRequest );
+ if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+ {
+ // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ s.getIoProcessor().flush( s );
+ }
+ }
+ }
+
+ protected void doClose( IoSession session ) throws IOException
+ {
+ MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
+ s.getIoProcessor().remove( s );
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
new file mode 100644
index 0000000000..c23ad8686f
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.Queue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $,
+ */
+class MultiThreadSocketIoProcessor extends SocketIoProcessor
+{
+ Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
+ Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader");
+ Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer");
+
+ private static final long SELECTOR_TIMEOUT = 1000L;
+
+ private int MAX_READ_BYTES_PER_SESSION = 524288; //512K
+ private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K
+
+ private final Object readLock = new Object();
+ private final Object writeLock = new Object();
+
+ private final String threadName;
+ private final Executor executor;
+
+ private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private volatile Selector selector, writeSelector;
+
+ private final Queue newSessions = new Queue();
+ private final Queue removingSessions = new Queue();
+ private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
+ private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
+
+ private final Queue trafficControllingSessions = new Queue();
+
+ private ReadWorker readWorker;
+ private WriteWorker writeWorker;
+ private long lastIdleReadCheckTime = System.currentTimeMillis();
+ private long lastIdleWriteCheckTime = System.currentTimeMillis();
+
+ MultiThreadSocketIoProcessor(String threadName, Executor executor)
+ {
+ super(threadName, executor);
+ this.threadName = threadName;
+ this.executor = executor;
+ }
+
+ void addNew(SocketSessionImpl session) throws IOException
+ {
+ synchronized (newSessions)
+ {
+ newSessions.push(session);
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+ writeSelector.wakeup();
+ }
+
+ void remove(SocketSessionImpl session) throws IOException
+ {
+ scheduleRemove(session);
+ startupWorker();
+ selector.wakeup();
+ }
+
+ private void startupWorker() throws IOException
+ {
+ synchronized (readLock)
+ {
+ if (readWorker == null)
+ {
+ selector = Selector.open();
+ readWorker = new ReadWorker();
+ executor.execute(new NamePreservingRunnable(readWorker));
+ }
+ }
+
+ synchronized (writeLock)
+ {
+ if (writeWorker == null)
+ {
+ writeSelector = Selector.open();
+ writeWorker = new WriteWorker();
+ executor.execute(new NamePreservingRunnable(writeWorker));
+ }
+ }
+
+ }
+
+ void flush(SocketSessionImpl session)
+ {
+ scheduleFlush(session);
+ Selector selector = this.writeSelector;
+
+ if (selector != null)
+ {
+ selector.wakeup();
+ }
+ }
+
+ void updateTrafficMask(SocketSessionImpl session)
+ {
+ scheduleTrafficControl(session);
+ Selector selector = this.selector;
+ if (selector != null)
+ {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleRemove(SocketSessionImpl session)
+ {
+ synchronized (removingSessions)
+ {
+ removingSessions.push(session);
+ }
+ }
+
+ private void scheduleFlush(SocketSessionImpl session)
+ {
+ synchronized (flushingSessionsSet)
+ {
+ //if flushingSessions grows to contain Integer.MAX_VALUE sessions
+ // then this will fail.
+ if (flushingSessionsSet.add(session))
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+
+ private void scheduleTrafficControl(SocketSessionImpl session)
+ {
+ synchronized (trafficControllingSessions)
+ {
+ trafficControllingSessions.push(session);
+ }
+ }
+
+ private void doAddNewReader() throws InterruptedException
+ {
+ if (newSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (newSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+
+ try
+ {
+
+ ch.configureBlocking(false);
+ session.setSelectionKey(ch.register(selector,
+ SelectionKey.OP_READ,
+ session));
+
+ //System.out.println("ReadDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch (IOException e)
+ {
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+
+ private void doAddNewWrite() throws InterruptedException
+ {
+ if (newSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (newSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) newSessions.peek();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ ch.configureBlocking(false);
+ synchronized (flushingSessionsSet)
+ {
+ flushingSessionsSet.add(session);
+ }
+
+ session.setWriteSelectionKey(ch.register(writeSelector,
+ SelectionKey.OP_WRITE,
+ session));
+
+ //System.out.println("WriteDebug:"+"Awaiting Registration");
+ session.awaitRegistration();
+ sessionCreated(session);
+ }
+ catch (IOException e)
+ {
+
+ // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+ // and call ConnectFuture.setException().
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ }
+ }
+
+
+ private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ synchronized (newSessions)
+ {
+ if (!session.created())
+ {
+ _logger.debug("Popping new session");
+ newSessions.pop();
+
+ // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+ // in AbstractIoFilterChain.fireSessionOpened().
+ session.getServiceListeners().fireSessionCreated(session);
+
+ session.doneCreation();
+ }
+ }
+ }
+
+ private void doRemove()
+ {
+ if (removingSessions.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ synchronized (removingSessions)
+ {
+ session = (MultiThreadSocketSessionImpl) removingSessions.pop();
+ }
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getReadSelectionKey();
+ SelectionKey writeKey = session.getWriteSelectionKey();
+
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.close() is called before addSession() is processed)
+ if (key == null || writeKey == null)
+ {
+ scheduleRemove(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid() || !writeKey.isValid())
+ {
+ continue;
+ }
+
+ try
+ {
+ //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
+ synchronized (readLock)
+ {
+ key.cancel();
+ }
+ synchronized (writeLock)
+ {
+ writeKey.cancel();
+ }
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ releaseWriteBuffers(session);
+ session.getServiceListeners().fireSessionDestroyed(session);
+ }
+ }
+ }
+
+ private void processRead(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
+
+ synchronized (readLock)
+ {
+ if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
+ {
+ read(session);
+ }
+ }
+
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void processWrite(Set selectedKeys)
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+
+ synchronized (writeLock)
+ {
+ if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
+ {
+
+ // Clear OP_WRITE
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ synchronized (flushingSessionsSet)
+ {
+ flushingSessions.offer(session);
+ }
+ }
+ }
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void read(SocketSessionImpl session)
+ {
+
+ //if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
+ }
+
+ int totalReadBytes = 0;
+
+ while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ buf.clear();
+
+ int readBytes = 0;
+ int ret;
+
+ try
+ {
+ while ((ret = ch.read(buf.buf())) > 0)
+ {
+ readBytes += ret;
+ totalReadBytes += ret;
+ }
+ }
+ finally
+ {
+ buf.flip();
+ }
+
+
+ if (readBytes > 0)
+ {
+ session.increaseReadBytes(readBytes);
+
+ session.getFilterChain().fireMessageReceived(session, buf);
+ buf = null;
+ }
+
+ if (ret <= 0)
+ {
+ if (ret == 0)
+ {
+ if (readBytes == session.getReadBufferSize())
+ {
+ continue;
+ }
+ }
+ else
+ {
+ scheduleRemove(session);
+ }
+
+ break;
+ }
+ }
+ catch (Throwable e)
+ {
+ if (e instanceof IOException)
+ {
+ scheduleRemove(session);
+ }
+ session.getFilterChain().fireExceptionCaught(session, e);
+
+ //Stop Reading this session.
+ return;
+ }
+ finally
+ {
+ if (buf != null)
+ {
+ buf.release();
+ }
+ }
+ }//for
+
+ // if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
+ }
+ }
+
+
+ private void notifyReadIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleReadCheckTime) >= 1000)
+ {
+ lastIdleReadCheckTime = currentTime;
+ Set keys = selector.keys();
+ if (keys != null)
+ {
+ for (Iterator it = keys.iterator(); it.hasNext();)
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+ notifyReadIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyWriteIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastIdleWriteCheckTime) >= 1000)
+ {
+ lastIdleWriteCheckTime = currentTime;
+ Set keys = writeSelector.keys();
+ if (keys != null)
+ {
+ for (Iterator it = keys.iterator(); it.hasNext();)
+ {
+ SelectionKey key = (SelectionKey) it.next();
+ SocketSessionImpl session = (SocketSessionImpl) key.attachment();
+ notifyWriteIdleness(session, currentTime);
+ }
+ }
+ }
+ }
+
+ private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+ IdleStatus.READER_IDLE,
+ Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
+
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+ IdleStatus.BOTH_IDLE,
+ Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+ IdleStatus.WRITER_IDLE,
+ Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+
+ notifyWriteTimeout(session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime());
+ }
+
+ private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime)
+ {
+ if (idleTime > 0 && lastIoTime != 0
+ && (currentTime - lastIoTime) >= idleTime)
+ {
+ session.increaseIdleCount(status);
+ session.getFilterChain().fireSessionIdle(session, status);
+ }
+ }
+
+ private void notifyWriteTimeout(SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime)
+ {
+
+ MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
+ SelectionKey key = sesh.getWriteSelectionKey();
+
+ synchronized (writeLock)
+ {
+ if (writeTimeout > 0
+ && (currentTime - lastIoTime) >= writeTimeout
+ && key != null && key.isValid()
+ && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
+ {
+ session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
+ }
+ }
+ }
+
+ private SocketSessionImpl getNextFlushingSession()
+ {
+ return (SocketSessionImpl) flushingSessions.poll();
+ }
+
+ private void releaseSession(SocketSessionImpl session)
+ {
+ synchronized (session.getWriteRequestQueue())
+ {
+ synchronized (flushingSessionsSet)
+ {
+ if (session.getScheduledWriteRequests() > 0)
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
+ }
+ flushingSessions.offer(session);
+ }
+ else
+ {
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
+ }
+ flushingSessionsSet.remove(session);
+ }
+ }
+ }
+ }
+
+ private void releaseWriteBuffers(SocketSessionImpl session)
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequest req;
+
+ //Should this be synchronized?
+ synchronized (writeRequestQueue)
+ {
+ while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
+ {
+ try
+ {
+ ((ByteBuffer) req.getMessage()).release();
+ }
+ catch (IllegalStateException e)
+ {
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+ finally
+ {
+ req.getFuture().setWritten(false);
+ }
+ }
+ }
+ }
+
+ private void doFlush()
+ {
+ MultiThreadSocketSessionImpl session;
+
+ while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
+ {
+ if (!session.isConnected())
+ {
+ releaseWriteBuffers(session);
+ releaseSession(session);
+ continue;
+ }
+
+ SelectionKey key = session.getWriteSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession() is processed)
+ if (key == null)
+ {
+ scheduleFlush(session);
+ releaseSession(session);
+ continue;
+ }
+ // skip if channel is already closed
+ if (!key.isValid())
+ {
+ releaseSession(session);
+ continue;
+ }
+
+ try
+ {
+ if (doFlush(session))
+ {
+ releaseSession(session);
+ }
+ }
+ catch (IOException e)
+ {
+ releaseSession(session);
+ scheduleRemove(session);
+ session.getFilterChain().fireExceptionCaught(session, e);
+ }
+
+ }
+
+ }
+
+ private boolean doFlush(SocketSessionImpl sessionParam) throws IOException
+ {
+ MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
+ // Clear OP_WRITE
+ SelectionKey key = session.getWriteSelectionKey();
+ synchronized (writeLock)
+ {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ }
+ SocketChannel ch = session.getChannel();
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ long totalFlushedBytes = 0;
+ while (true)
+ {
+ WriteRequest req;
+
+ synchronized (writeRequestQueue)
+ {
+ req = (WriteRequest) writeRequestQueue.first();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0)
+ {
+ synchronized (writeRequestQueue)
+ {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+
+ int writtenBytes = 0;
+
+ // Reported as DIRMINA-362
+ //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+ if (key.isWritable())
+ {
+ writtenBytes = ch.write(buf.buf());
+ totalFlushedBytes += writtenBytes;
+ }
+
+ if (writtenBytes > 0)
+ {
+ session.increaseWrittenBytes(writtenBytes);
+ }
+
+ if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
+ {
+ // Kernel buffer is full
+ synchronized (writeLock)
+ {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ }
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return false;
+ }
+ }
+
+ if (_loggerWrite.isDebugEnabled())
+ {
+ //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
+ }
+ return true;
+ }
+
+ private void doUpdateTrafficMask()
+ {
+ if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
+ {
+ return;
+ }
+
+ // Synchronize over entire operation as this method should be called
+ // from both read and write thread and we don't want the order of the
+ // updates to get changed.
+ trafficMaskUpdateLock.lock();
+ try
+ {
+ for (; ;)
+ {
+ MultiThreadSocketSessionImpl session;
+
+ session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop();
+
+ if (session == null)
+ {
+ break;
+ }
+
+ SelectionKey key = session.getReadSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if (key == null)
+ {
+ scheduleTrafficControl(session);
+ break;
+ }
+ // skip if channel is already closed
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+
+ //Sset to Read and Write if there is nothing then the cost
+ // is one loop through the flusher.
+ int ops = SelectionKey.OP_READ;
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ synchronized (readLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ //Change key to the WriteSelection Key
+ key = session.getWriteSelectionKey();
+ if (key != null && key.isValid())
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized (writeRequestQueue)
+ {
+ if (!writeRequestQueue.isEmpty())
+ {
+ ops = SelectionKey.OP_WRITE;
+ synchronized (writeLock)
+ {
+ key.interestOps(ops & mask);
+ }
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ trafficMaskUpdateLock.unlock();
+ }
+
+ }
+
+ private class WriteWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
+
+ //System.out.println("WriteDebug:"+"Startup");
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
+
+ doAddNewWrite();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0)
+ {
+ //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
+ processWrite(writeSelector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("WriteDebug:"+"No keys from writeselector");
+ }
+
+ doRemove();
+ notifyWriteIdleness();
+
+ if (flushingSessionsSet.size() > 0)
+ {
+ doFlush();
+ }
+
+ if (writeSelector.keys().isEmpty())
+ {
+ synchronized (writeLock)
+ {
+
+ if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
+ {
+ writeWorker = null;
+ try
+ {
+ writeSelector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ writeSelector = null;
+ }
+
+ break;
+ }
+ }
+ }
+
+ }
+ catch (Throwable t)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ //System.out.println("WriteDebug:"+"Shutdown");
+ }
+
+ }
+
+ private class ReadWorker implements Runnable
+ {
+
+ public void run()
+ {
+ Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
+
+ //System.out.println("ReadDebug:"+"Startup");
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(SELECTOR_TIMEOUT);
+
+ doAddNewReader();
+ doUpdateTrafficMask();
+
+ if (nKeys > 0)
+ {
+ //System.out.println("ReadDebug:"+nKeys + " keys from selector");
+
+ processRead(selector.selectedKeys());
+ }
+ else
+ {
+ //System.out.println("ReadDebug:"+"No keys from selector");
+ }
+
+
+ doRemove();
+ notifyReadIdleness();
+
+ if (selector.keys().isEmpty())
+ {
+
+ synchronized (readLock)
+ {
+ if (selector.keys().isEmpty() && newSessions.isEmpty())
+ {
+ readWorker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+
+ break;
+ }
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(t);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ //System.out.println("ReadDebug:"+"Shutdown");
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
new file mode 100644
index 0000000000..043d4800b6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * An {@link IoConnectorConfig} for {@link SocketConnector}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl
+{
+ private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+ private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+ private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false;
+ private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false;
+
+ private static boolean DEFAULT_REUSE_ADDRESS;
+ private static int DEFAULT_RECEIVE_BUFFER_SIZE;
+ private static int DEFAULT_SEND_BUFFER_SIZE;
+ private static int DEFAULT_TRAFFIC_CLASS;
+ private static boolean DEFAULT_KEEP_ALIVE;
+ private static boolean DEFAULT_OOB_INLINE;
+ private static int DEFAULT_SO_LINGER;
+ private static boolean DEFAULT_TCP_NO_DELAY;
+
+ static
+ {
+ initialize();
+ }
+
+ private static void initialize()
+ {
+ Socket socket = null;
+
+ socket = new Socket();
+
+ try
+ {
+ DEFAULT_REUSE_ADDRESS = socket.getReuseAddress();
+ DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize();
+ DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize();
+ DEFAULT_KEEP_ALIVE = socket.getKeepAlive();
+ DEFAULT_OOB_INLINE = socket.getOOBInline();
+ DEFAULT_SO_LINGER = socket.getSoLinger();
+ DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay();
+
+ // Check if setReceiveBufferSize is supported.
+ try
+ {
+ socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE);
+ SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
+ }
+
+ // Check if setSendBufferSize is supported.
+ try
+ {
+ socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE);
+ SET_SEND_BUFFER_SIZE_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ SET_SEND_BUFFER_SIZE_AVAILABLE = false;
+ }
+
+ // Check if getTrafficClass is supported.
+ try
+ {
+ DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass();
+ GET_TRAFFIC_CLASS_AVAILABLE = true;
+ }
+ catch( SocketException e )
+ {
+ GET_TRAFFIC_CLASS_AVAILABLE = false;
+ DEFAULT_TRAFFIC_CLASS = 0;
+ }
+ }
+ catch( SocketException e )
+ {
+ throw new ExceptionInInitializerError(e);
+ }
+ finally
+ {
+ if( socket != null )
+ {
+ try
+ {
+ socket.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ public static boolean isSetReceiveBufferSizeAvailable() {
+ return SET_RECEIVE_BUFFER_SIZE_AVAILABLE;
+ }
+
+ public static boolean isSetSendBufferSizeAvailable() {
+ return SET_SEND_BUFFER_SIZE_AVAILABLE;
+ }
+
+ public static boolean isGetTrafficClassAvailable() {
+ return GET_TRAFFIC_CLASS_AVAILABLE;
+ }
+
+ public static boolean isSetTrafficClassAvailable() {
+ return SET_TRAFFIC_CLASS_AVAILABLE;
+ }
+
+ private boolean reuseAddress = DEFAULT_REUSE_ADDRESS;
+ private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+ private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+ private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+ private boolean keepAlive = DEFAULT_KEEP_ALIVE;
+ private boolean oobInline = DEFAULT_OOB_INLINE;
+ private int soLinger = DEFAULT_SO_LINGER;
+ private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+ /**
+ * Creates a new instance.
+ */
+ MultiThreadSocketSessionConfigImpl()
+ {
+ }
+
+ public boolean isReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress( boolean reuseAddress )
+ {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public int getReceiveBufferSize()
+ {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize( int receiveBufferSize )
+ {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getSendBufferSize()
+ {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize( int sendBufferSize )
+ {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getTrafficClass()
+ {
+ return trafficClass;
+ }
+
+ public void setTrafficClass( int trafficClass )
+ {
+ this.trafficClass = trafficClass;
+ }
+
+ public boolean isKeepAlive()
+ {
+ return keepAlive;
+ }
+
+ public void setKeepAlive( boolean keepAlive )
+ {
+ this.keepAlive = keepAlive;
+ }
+
+ public boolean isOobInline()
+ {
+ return oobInline;
+ }
+
+ public void setOobInline( boolean oobInline )
+ {
+ this.oobInline = oobInline;
+ }
+
+ public int getSoLinger()
+ {
+ return soLinger;
+ }
+
+ public void setSoLinger( int soLinger )
+ {
+ this.soLinger = soLinger;
+ }
+
+ public boolean isTcpNoDelay()
+ {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay( boolean tcpNoDelay )
+ {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
new file mode 100644
index 0000000000..be4a2d289d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.IoServiceListenerSupport;
+import org.apache.mina.util.Queue;
+
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An {@link IoSession} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+class MultiThreadSocketSessionImpl extends SocketSessionImpl
+{
+ private final IoService manager;
+ private final IoServiceConfig serviceConfig;
+ private final SocketSessionConfig config = new SessionConfigImpl();
+ private final MultiThreadSocketIoProcessor ioProcessor;
+ private final MultiThreadSocketFilterChain filterChain;
+ private final SocketChannel ch;
+ private final Queue writeRequestQueue;
+ private final IoHandler handler;
+ private final SocketAddress remoteAddress;
+ private final SocketAddress localAddress;
+ private final SocketAddress serviceAddress;
+ private final IoServiceListenerSupport serviceListeners;
+ private SelectionKey readKey, writeKey;
+ private int readBufferSize;
+ private CountDownLatch registeredReadyLatch = new CountDownLatch(2);
+ private AtomicBoolean created = new AtomicBoolean(false);
+
+ /**
+ * Creates a new instance.
+ */
+ MultiThreadSocketSessionImpl( IoService manager,
+ SocketIoProcessor ioProcessor,
+ IoServiceListenerSupport listeners,
+ IoServiceConfig serviceConfig,
+ SocketChannel ch,
+ IoHandler defaultHandler,
+ SocketAddress serviceAddress )
+ {
+ super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress);
+ this.manager = manager;
+ this.serviceListeners = listeners;
+ this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor;
+ this.filterChain = new MultiThreadSocketFilterChain(this);
+ this.ch = ch;
+ this.writeRequestQueue = new Queue();
+ this.handler = defaultHandler;
+ this.remoteAddress = ch.socket().getRemoteSocketAddress();
+ this.localAddress = ch.socket().getLocalSocketAddress();
+ this.serviceAddress = serviceAddress;
+ this.serviceConfig = serviceConfig;
+
+ // Apply the initial session settings
+ IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
+ if( sessionConfig instanceof SocketSessionConfig )
+ {
+ SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig;
+ this.config.setKeepAlive( cfg.isKeepAlive() );
+ this.config.setOobInline( cfg.isOobInline() );
+ this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() );
+ this.readBufferSize = cfg.getReceiveBufferSize();
+ this.config.setReuseAddress( cfg.isReuseAddress() );
+ this.config.setSendBufferSize( cfg.getSendBufferSize() );
+ this.config.setSoLinger( cfg.getSoLinger() );
+ this.config.setTcpNoDelay( cfg.isTcpNoDelay() );
+
+ if( this.config.getTrafficClass() != cfg.getTrafficClass() )
+ {
+ this.config.setTrafficClass( cfg.getTrafficClass() );
+ }
+ }
+ }
+
+ void awaitRegistration() throws InterruptedException
+ {
+ registeredReadyLatch.countDown();
+
+ registeredReadyLatch.await();
+ }
+
+ boolean created() throws InterruptedException
+ {
+ return created.get();
+ }
+
+ void doneCreation()
+ {
+ created.getAndSet(true);
+ }
+
+ public IoService getService()
+ {
+ return manager;
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return serviceConfig;
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return config;
+ }
+
+ SocketIoProcessor getIoProcessor()
+ {
+ return ioProcessor;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filterChain;
+ }
+
+ SocketChannel getChannel()
+ {
+ return ch;
+ }
+
+ IoServiceListenerSupport getServiceListeners()
+ {
+ return serviceListeners;
+ }
+
+ SelectionKey getSelectionKey()
+ {
+ return readKey;
+ }
+
+ SelectionKey getReadSelectionKey()
+ {
+ return readKey;
+ }
+
+ SelectionKey getWriteSelectionKey()
+ {
+ return writeKey;
+ }
+
+ void setSelectionKey(SelectionKey key)
+ {
+ this.readKey = key;
+ }
+
+ void setWriteSelectionKey(SelectionKey key)
+ {
+ this.writeKey = key;
+ }
+
+ public IoHandler getHandler()
+ {
+ return handler;
+ }
+
+ protected void close0()
+ {
+ filterChain.fireFilterClose( this );
+ }
+
+ Queue getWriteRequestQueue()
+ {
+ return writeRequestQueue;
+ }
+
+ /**
+ @return int Number of write scheduled write requests
+ @deprecated
+ */
+ public int getScheduledWriteMessages()
+ {
+ return getScheduledWriteRequests();
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.size();
+ }
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.byteSize();
+ }
+ }
+
+ protected void write0( WriteRequest writeRequest )
+ {
+ filterChain.fireFilterWrite( this, writeRequest );
+ }
+
+ public TransportType getTransportType()
+ {
+ return TransportType.SOCKET;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ //This is what I had previously
+// return ch.socket().getRemoteSocketAddress();
+ return remoteAddress;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ //This is what I had previously
+// return ch.socket().getLocalSocketAddress();
+ return localAddress;
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return serviceAddress;
+ }
+
+ protected void updateTrafficMask()
+ {
+ this.ioProcessor.updateTrafficMask( this );
+ }
+
+ int getReadBufferSize()
+ {
+ return readBufferSize;
+ }
+
+ private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
+ {
+ public boolean isKeepAlive()
+ {
+ try
+ {
+ return ch.socket().getKeepAlive();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setKeepAlive( boolean on )
+ {
+ try
+ {
+ ch.socket().setKeepAlive( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isOobInline()
+ {
+ try
+ {
+ return ch.socket().getOOBInline();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setOobInline( boolean on )
+ {
+ try
+ {
+ ch.socket().setOOBInline( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isReuseAddress()
+ {
+ try
+ {
+ return ch.socket().getReuseAddress();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setReuseAddress( boolean on )
+ {
+ try
+ {
+ ch.socket().setReuseAddress( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public int getSoLinger()
+ {
+ try
+ {
+ return ch.socket().getSoLinger();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setSoLinger( int linger )
+ {
+ try
+ {
+ if( linger < 0 )
+ {
+ ch.socket().setSoLinger( false, 0 );
+ }
+ else
+ {
+ ch.socket().setSoLinger( true, linger );
+ }
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public boolean isTcpNoDelay()
+ {
+ try
+ {
+ return ch.socket().getTcpNoDelay();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setTcpNoDelay( boolean on )
+ {
+ try
+ {
+ ch.socket().setTcpNoDelay( on );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public int getTrafficClass()
+ {
+ if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
+ {
+ try
+ {
+ return ch.socket().getTrafficClass();
+ }
+ catch( SocketException e )
+ {
+ // Throw an exception only when setTrafficClass is also available.
+ if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ public void setTrafficClass( int tc )
+ {
+ if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+ {
+ try
+ {
+ ch.socket().setTrafficClass( tc );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ public int getSendBufferSize()
+ {
+ try
+ {
+ return ch.socket().getSendBufferSize();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setSendBufferSize( int size )
+ {
+ if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setSendBufferSize( size );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+
+ public int getReceiveBufferSize()
+ {
+ try
+ {
+ return ch.socket().getReceiveBufferSize();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+
+ public void setReceiveBufferSize( int size )
+ {
+ if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setReceiveBufferSize( size );
+ MultiThreadSocketSessionImpl.this.readBufferSize = size;
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
new file mode 100644
index 0000000000..a23e546af5
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.transport.vmpipe.support.VmPipe;
+import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker;
+import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl;
+import org.apache.mina.util.AnonymousSocketAddress;
+
+/**
+ * Connects to {@link IoHandler}s which is bound on the specified
+ * {@link VmPipeAddress}.
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
+ */
+public class QpidVmPipeConnector extends VmPipeConnector
+{
+ private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
+ private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig()
+ {
+ public IoSessionConfig getSessionConfig()
+ {
+ return CONFIG;
+ }
+ };
+
+ /**
+ * Creates a new instance.
+ */
+ public QpidVmPipeConnector()
+ {
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ return connect( address, null, handler, config );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+ if( ! ( address instanceof VmPipeAddress ) )
+ throw new IllegalArgumentException(
+ "address must be VmPipeAddress." );
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address );
+ if( entry == null )
+ {
+ return DefaultConnectFuture.newFailedFuture(
+ new IOException( "Endpoint unavailable: " + address ) );
+ }
+
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ VmPipeSessionImpl localSession =
+ new VmPipeSessionImpl(
+ this,
+ config,
+ getListeners(),
+ new Object(), // lock
+ new AnonymousSocketAddress(),
+ handler,
+ entry );
+
+ // initialize acceptor session
+ VmPipeSessionImpl remoteSession = localSession.getRemoteSession();
+ try
+ {
+ IoFilterChain filterChain = remoteSession.getFilterChain();
+ entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain );
+ entry.getConfig().getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ entry.getListeners().fireSessionCreated( remoteSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ remoteSession.close();
+ }
+
+
+ // initialize connector session
+ try
+ {
+ IoFilterChain filterChain = localSession.getFilterChain();
+ this.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getFilterChainBuilder().buildFilterChain( filterChain );
+ config.getThreadModel().buildFilterChain( filterChain );
+
+ // The following sentences don't throw any exceptions.
+ localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future );
+ getListeners().fireSessionCreated( localSession );
+ VmPipeIdleStatusChecker.getInstance().addSession( localSession);
+ }
+ catch( Throwable t )
+ {
+ future.setException( t );
+ }
+
+
+
+ return future;
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index 2f6290b55a..ef9420ba87 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -54,7 +54,6 @@ public class AMQChannelException extends AMQException
public AMQFrame getCloseFrame(int channel)
{
MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
- return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), getMessageAsShortString(),_classId,_methodId));
+ return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
}
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
index ca9c9f9dc4..8ef6facef1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -62,10 +62,9 @@ public class AMQConnectionException extends AMQException
MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
return new AMQFrame(0,
reg.createConnectionCloseBody(getErrorCode().getCode(),
- getMessageAsShortString(),
+ new AMQShortString(getMessage()),
_classId,
_methodId));
}
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java
index 86d439d269..b0c6fccc9e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
/**
@@ -122,19 +121,4 @@ public class AMQException extends Exception
return newAMQE;
}
-
- /**
- * Truncates the exception message to 255 characters if its length exceeds 255.
- *
- * @return exception message
- */
- public AMQShortString getMessageAsShortString()
- {
- String message = getMessage();
- if (message != null && message.length() > AMQShortString.MAX_LENGTH)
- {
- message = message.substring(0, AMQShortString.MAX_LENGTH - 3) + "...";
- }
- return new AMQShortString(message);
- }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
new file mode 100644
index 0000000000..5423bbb68f
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.mina.MinaHandler;
+
+import static org.apache.qpid.transport.util.Functions.str;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * ToyBroker
+ *
+ * @author Rafael H. Schloming
+ */
+
+class ToyBroker extends SessionDelegate
+{
+
+ private ToyExchange exchange;
+ private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
+
+ public ToyBroker(ToyExchange exchange)
+ {
+ this.exchange = exchange;
+ }
+
+ public void messageAcquire(Session context, MessageAcquire struct)
+ {
+ System.out.println("\n==================> messageAcquire " );
+ context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers()));
+ }
+
+ @Override public void queueDeclare(Session ssn, QueueDeclare qd)
+ {
+ exchange.createQueue(qd.getQueue());
+ System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
+ }
+
+ @Override public void exchangeBind(Session ssn, ExchangeBind qb)
+ {
+ exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
+ System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n");
+ }
+
+ @Override public void queueQuery(Session ssn, QueueQuery qq)
+ {
+ QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
+ ssn.executionResult((int) qq.getId(), result);
+ }
+
+ @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
+ {
+ Consumer c = new Consumer();
+ c._queueName = ms.getQueue();
+ consumers.put(ms.getDestination(),c);
+ System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
+ }
+
+ @Override public void messageFlow(Session ssn,MessageFlow struct)
+ {
+ Consumer c = consumers.get(struct.getDestination());
+ c._credit = struct.getValue();
+ System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
+ }
+
+ @Override public void messageFlush(Session ssn,MessageFlush struct)
+ {
+ System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
+ checkAndSendMessagesToConsumer(ssn,struct.getDestination());
+ }
+
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ String dest = xfr.getDestination();
+ System.out.println("received transfer " + dest);
+ Header header = xfr.getHeader();
+ DeliveryProperties props = header.get(DeliveryProperties.class);
+ if (props != null)
+ {
+ System.out.println("received headers routing_key " + props.getRoutingKey());
+ }
+
+ MessageProperties mp = header.get(MessageProperties.class);
+ System.out.println("MP: " + mp);
+ if (mp != null)
+ {
+ System.out.println(mp.getApplicationHeaders());
+ }
+
+ if (exchange.route(dest,props == null ? null : props.getRoutingKey(),xfr))
+ {
+ System.out.println("queued " + xfr);
+ dispatchMessages(ssn);
+ }
+ else
+ {
+
+ if (props == null || !props.getDiscardUnroutable())
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(xfr.getId());
+ ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+ "no such destination");
+ }
+ }
+ ssn.processed(xfr);
+ }
+
+ private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
+ {
+ System.out.println("\n==================> Transfering message to: " +dest + "\n");
+ ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ m.getHeader(), m.getBody());
+ }
+
+ private void dispatchMessages(Session ssn)
+ {
+ for (String dest: consumers.keySet())
+ {
+ checkAndSendMessagesToConsumer(ssn,dest);
+ }
+ }
+
+ private void checkAndSendMessagesToConsumer(Session ssn,String dest)
+ {
+ Consumer c = consumers.get(dest);
+ LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
+ MessageTransfer m = queue.poll();
+ while (m != null && c._credit>0)
+ {
+ transferMessageToPeer(ssn,dest,m);
+ c._credit--;
+ m = queue.poll();
+ }
+ }
+
+ // ugly, but who cares :)
+ // assumes unit is always no of messages, not bytes
+ // assumes it's credit mode and not window
+ private static class Consumer
+ {
+ long _credit;
+ String _queueName;
+ }
+
+ private static final class ToyBrokerSession extends Session
+ {
+
+ public ToyBrokerSession(Connection connection, Binary name, long expiry, ToyExchange exchange)
+ {
+ super(connection, new ToyBroker(exchange), name, expiry);
+ }
+ }
+
+ public static final void main(String[] args) throws IOException
+ {
+ final ToyExchange exchange = new ToyExchange();
+ ConnectionDelegate delegate = new ServerDelegate()
+ {
+ @Override
+ public void init(Connection conn, ProtocolHeader hdr)
+ {
+ conn.setSessionFactory(new Connection.SessionFactory()
+ {
+ public Session newSession(Connection conn, Binary name, long expiry)
+ {
+ return new ToyBrokerSession(conn, name, expiry, exchange);
+ }
+ });
+
+ super.init(conn, hdr); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ };
+
+ MinaHandler.accept("0.0.0.0", 5672, delegate);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
new file mode 100644
index 0000000000..5b2db10613
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid;
+
+import java.nio.*;
+import java.util.*;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.mina.MinaHandler;
+
+
+/**
+ * ToyClient
+ *
+ * @author Rafael H. Schloming
+ */
+
+class ToyClient implements SessionListener
+{
+ public void opened(Session ssn) {}
+
+ public void resumed(Session ssn) {}
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ System.out.println("msg: " + xfr);
+ }
+
+ public void closed(Session ssn) {}
+
+ public static final void main(String[] args)
+ {
+ Connection conn = new Connection();
+ conn.connect("0.0.0.0", 5672, null, "guest", "guest", false);
+ Session ssn = conn.createSession();
+ ssn.setSessionListener(new ToyClient());
+
+ ssn.queueDeclare("asdf", null, null);
+ ssn.sync();
+
+ Map<String,Object> nested = new LinkedHashMap<String,Object>();
+ nested.put("list", Arrays.asList("one", "two", "three"));
+ Map<String,Object> map = new LinkedHashMap<String,Object>();
+
+ map.put("str", "this is a string");
+
+ map.put("+int", 3);
+ map.put("-int", -3);
+ map.put("maxint", Integer.MAX_VALUE);
+ map.put("minint", Integer.MIN_VALUE);
+
+ map.put("+short", (short) 1);
+ map.put("-short", (short) -1);
+ map.put("maxshort", (short) Short.MAX_VALUE);
+ map.put("minshort", (short) Short.MIN_VALUE);
+
+ map.put("float", (float) 3.3);
+ map.put("double", 4.9);
+ map.put("char", 'c');
+
+ map.put("table", nested);
+ map.put("list", Arrays.asList(1, 2, 3));
+ map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
+
+ ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(new DeliveryProperties(),
+ new MessageProperties()
+ .setApplicationHeaders(map)),
+ "this is the data");
+
+ ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null,
+ "this should be rejected");
+ ssn.sync();
+
+ Future<QueueQueryResult> future = ssn.queueQuery("asdf");
+ System.out.println(future.get().getQueue());
+ ssn.sync();
+ ssn.close();
+ conn.close();
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
new file mode 100644
index 0000000000..da6aed9629
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
@@ -0,0 +1,154 @@
+package org.apache.qpid;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.qpid.transport.MessageTransfer;
+
+
+public class ToyExchange
+{
+ final static String DIRECT = "amq.direct";
+ final static String TOPIC = "amq.topic";
+
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+ private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>();
+
+ public void createQueue(String name)
+ {
+ queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
+ }
+
+ public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
+ {
+ return queues.get(name);
+ }
+
+ public void bindQueue(String type,String binding,String queueName)
+ {
+ LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
+ binding = normalizeKey(binding);
+ if(DIRECT.equals(type))
+ {
+
+ if (directEx.containsKey(binding))
+ {
+ List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
+ list.add(queue);
+ directEx.put(binding,list);
+ }
+ }
+ else
+ {
+ if (topicEx.containsKey(binding))
+ {
+ List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
+ list.add(queue);
+ topicEx.put(binding,list);
+ }
+ }
+ }
+
+ public boolean route(String dest, String routingKey, MessageTransfer msg)
+ {
+ List<LinkedBlockingQueue<MessageTransfer>> queues;
+ if(DIRECT.equals(dest))
+ {
+ queues = directEx.get(routingKey);
+ }
+ else
+ {
+ queues = matchWildCard(routingKey);
+ }
+ if(queues != null && queues.size()>0)
+ {
+ System.out.println("Message stored in " + queues.size() + " queues");
+ storeMessage(msg,queues);
+ return true;
+ }
+ else
+ {
+ System.out.println("Message unroutable " + msg);
+ return false;
+ }
+ }
+
+ private String normalizeKey(String routingKey)
+ {
+ if(routingKey.indexOf(".*")>1)
+ {
+ return routingKey.substring(0,routingKey.indexOf(".*"));
+ }
+ else
+ {
+ return routingKey;
+ }
+ }
+
+ private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
+ {
+ List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
+
+ for(String key: topicEx.keySet())
+ {
+ Pattern p = Pattern.compile(key);
+ Matcher m = p.matcher(routingKey);
+ if (m.find())
+ {
+ for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
+ {
+ selected.add(queue);
+ }
+ }
+ }
+
+ return selected;
+ }
+
+ private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
+ {
+ for(LinkedBlockingQueue<MessageTransfer> queue : selected)
+ {
+ queue.offer(msg);
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 36f9a1ae2d..0dd21238a7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
*/
public class ClientProperties
{
-
+
/**
* Currently with Qpid it is not possible to change the client ID.
* If one is not specified upon connection construction, an id is generated automatically.
@@ -68,40 +68,67 @@ public class ClientProperties
* by the broker in TuneOK it will be used as the heartbeat interval.
* If not a warning will be printed and the max value specified for
* heartbeat in TuneOK will be used
- *
+ *
* The default idle timeout is set to 120 secs
*/
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
public static final long DEFAULT_IDLE_TIMEOUT = 120000;
-
+
public static final String HEARTBEAT = "qpid.heartbeat";
public static final int HEARTBEAT_DEFAULT = 120;
-
+
/**
* This value will be used to determine the default destination syntax type.
* Currently the two types are Binding URL (java only) and the Addressing format (used by
- * all clients).
+ * all clients).
*/
public static final String DEST_SYNTAX = "qpid.dest_syntax";
-
+
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
- public static final String AMQP_VERSION = "qpid.amqp.version";
+ /**
+ * ==========================================================
+ * Those properties are used when the io size should be bounded
+ * ==========================================================
+ */
- public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
+ /**
+ * When set to true the io layer throttle down producers and consumers
+ * when written or read messages are accumulating and exceeding a certain size.
+ * This is especially useful when a the producer rate is greater than the network
+ * speed.
+ * type: boolean
+ */
+ public static final String PROTECTIO_PROP_NAME = "protectio";
- private static ClientProperties _instance = new ClientProperties();
+ //=== The following properties are only used when the previous one is true.
+ /**
+ * Max size of read messages that can be stored within the MINA layer
+ * type: int
+ */
+ public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+ public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
+ /**
+ * Max size of written messages that can be stored within the MINA layer
+ * type: int
+ */
+ public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+ public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final String AMQP_VERSION = "qpid.amqp.version";
+
+ private static ClientProperties _instance = new ClientProperties();
+
/*
- public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
+ public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
+
public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
+
+
public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
+
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index 2b9e2ffaba..39a9beb9e8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -37,10 +37,6 @@ import java.lang.ref.WeakReference;
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
- /**
- * The maximum number of octets in AMQ short string as defined in AMQP specification
- */
- public static final int MAX_LENGTH = 255;
private static final byte MINUS = (byte)'-';
private static final byte ZERO = (byte) '0';
@@ -122,19 +118,22 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public AMQShortString(byte[] data)
{
- if (data == null)
- {
- throw new NullPointerException("Cannot create AMQShortString with null data[]");
- }
- if (data.length > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
+
_data = data.clone();
_length = data.length;
_offset = 0;
}
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
+ }
+
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
@@ -147,12 +146,7 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
{
throw new NullPointerException("Cannot create AMQShortString with null char[]");
}
- // the current implementation of 0.8/0.9.x short string encoding
- // supports only ASCII characters
- if (data.length> MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
+
final int length = data.length;
final byte[] stringBytes = new byte[length];
int hash = 0;
@@ -171,17 +165,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
public AMQShortString(CharSequence charSequence)
{
- if (charSequence == null)
- {
- // it should be possible to create short string for null data
- charSequence = "";
- }
- // the current implementation of 0.8/0.9.x short string encoding
- // supports only ASCII characters
- if (charSequence.length() > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
final int length = charSequence.length();
final byte[] stringBytes = new byte[length];
int hash = 0;
@@ -201,10 +184,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private AMQShortString(ByteBuffer data, final int length)
{
- if (length > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
if(data.isDirect() || data.isReadOnly())
{
byte[] dataBytes = new byte[length];
@@ -226,17 +205,8 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
private AMQShortString(final byte[] data, final int from, final int to)
{
- if (data == null)
- {
- throw new NullPointerException("Cannot create AMQShortString with null data[]");
- }
- int length = to - from;
- if (length > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
_offset = from;
- _length = length;
+ _length = to - from;
_data = data;
}
@@ -275,6 +245,29 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return new CharSubSequence(start, end);
}
+ public int writeToByteArray(byte[] encoding, int pos)
+ {
+ final int size = length();
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
+ }
+
+ public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
+ {
+
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return shortString;
+ }
+ }
+
public static AMQShortString readFromBuffer(ByteBuffer buffer)
{
final short length = buffer.getUnsigned();
@@ -697,10 +690,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
size += term.length();
}
- if (size > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
byte[] data = new byte[size];
int pos = 0;
final byte[] delimData = delim._data;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 30db3b8be7..83e5a7e341 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -36,7 +36,7 @@ public class ContentHeaderBody implements AMQBody
public long bodySize;
/** must never be null */
- private ContentHeaderProperties properties;
+ public ContentHeaderProperties properties;
public ContentHeaderBody()
{
@@ -128,14 +128,4 @@ public class ContentHeaderBody implements AMQBody
{
return new AMQFrame(channelId, body);
}
-
- public ContentHeaderProperties getProperties()
- {
- return properties;
- }
-
- public void setProperties(ContentHeaderProperties props)
- {
- properties = props;
- }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 48a3df734a..31953ea6ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -22,6 +22,8 @@ package org.apache.qpid.protocol;
import java.net.SocketAddress;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Receiver;
/**
@@ -30,6 +32,9 @@ import org.apache.qpid.transport.Receiver;
*/
public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
{
+ // Sets the network driver providing data for this ProtocolEngine
+ void setNetworkDriver (NetworkDriver driver);
+
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
index 4e40b78440..9df84eef90 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.protocol;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.NetworkDriver;
public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine(NetworkConnection network);
+ ProtocolEngine newProtocolEngine(NetworkDriver networkDriver);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
index 30010a2d89..38f60c04fe 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
@@ -23,7 +23,7 @@ package org.apache.qpid.thread;
import org.apache.qpid.thread.Threading;
-import java.util.concurrent.Executor;
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
public class QpidThreadExecutor implements Executor
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index c8b7ad2a5e..0d9f8c0b28 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -247,7 +247,7 @@ public class ClientDelegate extends ConnectionDelegate
int i = heartbeat;
if (i == 0)
{
- log.info("Idle timeout is 0 sec. Heartbeats are disabled.");
+ log.warn("Idle timeout is zero. Heartbeats are disabled");
return 0; // heartbeats are disabled.
}
else if (i >= min && i <= max)
@@ -256,8 +256,8 @@ public class ClientDelegate extends ConnectionDelegate
}
else
{
- log.info("The broker does not support the configured connection idle timeout of %s sec," +
- " using the brokers max supported value of %s sec instead.", i,max);
+ log.warn("Ignoring the idle timeout %s set by the connection," +
+ " using the brokers max value %s", i,max);
return max;
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 7d17397b2d..e5e10c0e07 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -25,10 +25,9 @@ import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -40,13 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
@@ -120,6 +112,7 @@ public class Connection extends ConnectionInvoker
private SaslServer saslServer;
private SaslClient saslClient;
private int idleTimeout = 0;
+ private String _authorizationID;
private Map<String,Object> _serverProperties;
private String userID;
private ConnectionSettings conSettings;
@@ -241,14 +234,13 @@ public class Connection extends ConnectionInvoker
state = OPENING;
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
-
- securityLayer = new SecurityLayer(this);
-
- OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
- Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
- NetworkConnection network = transport.connect(settings, receiver, null);
- sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
-
+
+ TransportBuilder transport = new TransportBuilder();
+ transport.init(this);
+ this.sender = transport.buildSenderPipe();
+ transport.buildReceiverPipe(this);
+ this.securityLayer = transport.getSecurityLayer();
+
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -474,12 +466,11 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
- List <Binary> transactedSessions = new ArrayList();
for (Session ssn : sessions.values())
{
if (ssn.isTransacted())
- {
- transactedSessions.add(ssn.getName());
+ {
+ removeSession(ssn);
ssn.setState(Session.State.CLOSED);
}
else
@@ -489,11 +480,6 @@ public class Connection extends ConnectionInvoker
ssn.resume();
}
}
-
- for (Binary ssn_name : transactedSessions)
- {
- sessions.remove(ssn_name);
- }
setState(OPEN);
}
}
@@ -659,6 +645,16 @@ public class Connection extends ConnectionInvoker
return idleTimeout;
}
+ public void setAuthorizationID(String authorizationID)
+ {
+ _authorizationID = authorizationID;
+ }
+
+ public String getAuthorizationID()
+ {
+ return _authorizationID;
+ }
+
public String getUserID()
{
return userID;
@@ -699,8 +695,4 @@ public class Connection extends ConnectionInvoker
return connectionLost.get();
}
- protected Collection<Session> getChannels()
- {
- return channels.values();
- }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index f183c1e241..88dd2d6afa 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -95,7 +95,6 @@ public abstract class ConnectionDelegate
Session ssn = conn.getSession(dtc.getChannel());
if (ssn != null)
{
- ssn.setDetachCode(dtc.getCode());
conn.unmap(ssn);
ssn.closed();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
index 2074c77a5b..08678b213b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -30,8 +30,6 @@ import java.util.Map;
*/
public class ConnectionSettings
{
- public static final String WILDCARD_ADDRESS = "*";
-
String protocol = "tcp";
String host = "localhost";
String vhost;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
new file mode 100644
index 0000000000..86af97bf7e
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
+{
+ // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
+ // it using the SSLContextFactory if provided
+ void open(int port, InetAddress destination, ProtocolEngine engine,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
+ throws OpenException;
+
+ // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
+ // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
+ // (in the case of an SSLContextFactory, if provided)
+ void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
+
+ // Returns the remote address of the underlying socket
+ SocketAddress getRemoteAddress();
+
+ // Returns the local address of the underlying socket
+ SocketAddress getLocalAddress();
+
+ /**
+ * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been
+ * read in seconds
+ */
+ void setMaxReadIdle(int idleTime);
+
+ /**
+ * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been
+ * written in seconds
+ */
+ void setMaxWriteIdle(int idleTime);
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
index 8d3f7a779a..c38afe5dd5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
@@ -25,22 +25,20 @@ package org.apache.qpid.transport;
* buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned
* from here if the underlying implementation supports them.
*/
-public interface NetworkTransportConfiguration
+public interface NetworkDriverConfiguration
{
// Taken from Socket
+ Boolean getKeepAlive();
+ Boolean getOOBInline();
+ Boolean getReuseAddress();
+ Integer getSoLinger(); // null means off
+ Integer getSoTimeout();
Boolean getTcpNoDelay();
+ Integer getTrafficClass();
// The amount of memory in bytes to allocate to the incoming buffer
Integer getReceiveBufferSize();
// The amount of memory in bytes to allocate to the outgoing buffer
- Integer getSendBufferSize();
-
- Integer getPort();
-
- String getHost();
-
- String getTransport();
-
- Integer getConnectorProcessors();
-}
+ Integer getSendBufferSize();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 11af86f412..f21df251da 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -75,7 +75,10 @@ public class ServerDelegate extends ConnectionDelegate
if (mechanism == null || mechanism.length() == 0)
{
- tuneAuthorizedConnection(conn);
+ conn.connectionTune
+ (getChannelMax(),
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, getHeartbeatMax());
return;
}
@@ -94,7 +97,8 @@ public class ServerDelegate extends ConnectionDelegate
}
catch (SaslException e)
{
- connectionAuthFailed(conn, e);
+ conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
}
@@ -105,52 +109,33 @@ public class ServerDelegate extends ConnectionDelegate
return ss;
}
- protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
+ private void secure(Connection conn, byte[] response)
{
+ SaslServer ss = conn.getSaslServer();
try
{
byte[] challenge = ss.evaluateResponse(response);
if (ss.isComplete())
{
ss.dispose();
- tuneAuthorizedConnection(conn);
+ conn.connectionTune
+ (getChannelMax(),
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, getHeartbeatMax());
+ conn.setAuthorizationID(ss.getAuthorizationID());
}
else
{
- connectionAuthContinue(conn, challenge);
+ conn.connectionSecure(challenge);
}
}
catch (SaslException e)
{
- connectionAuthFailed(conn, e);
+ conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
}
- protected void connectionAuthFailed(final Connection conn, Exception e)
- {
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
- }
-
- protected void connectionAuthContinue(final Connection conn, byte[] challenge)
- {
- conn.connectionSecure(challenge);
- }
-
- protected void tuneAuthorizedConnection(final Connection conn)
- {
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
- }
-
- protected void secure(final Connection conn, final byte[] response)
- {
- final SaslServer ss = conn.getSaslServer();
- secure(ss, conn, response);
- }
-
protected int getHeartbeatMax()
{
return 0xFFFF;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index e0c6cb29d3..214d4534c1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -42,10 +42,7 @@ import static org.apache.qpid.util.Serial.max;
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -120,9 +117,7 @@ public class Session extends SessionInvoker
private Thread resumer = null;
private boolean transacted = false;
- private SessionDetachCode detachCode;
- private final Object stateLock = new Object();
-
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -267,32 +262,7 @@ public class Session extends SessionInvoker
}
else if (m instanceof MessageTransfer)
{
- MessageTransfer xfr = (MessageTransfer)m;
-
- if (xfr.getHeader() != null)
- {
- if (xfr.getHeader().get(DeliveryProperties.class) != null)
- {
- xfr.getHeader().get(DeliveryProperties.class).setRedelivered(true);
- }
- else
- {
- Struct[] structs = xfr.getHeader().getStructs();
- DeliveryProperties deliveryProps = new DeliveryProperties();
- deliveryProps.setRedelivered(true);
-
- List<Struct> list = Arrays.asList(structs);
- list.add(deliveryProps);
- xfr.setHeader(new Header(list));
- }
-
- }
- else
- {
- DeliveryProperties deliveryProps = new DeliveryProperties();
- deliveryProps.setRedelivered(true);
- xfr.setHeader(new Header(deliveryProps));
- }
+ ((MessageTransfer)m).getHeader().get(DeliveryProperties.class).setRedelivered(true);
}
sessionCommandPoint(m.getId(), 0);
send(m);
@@ -452,10 +422,7 @@ public class Session extends SessionInvoker
{
return;
}
- if (copy.size() > 0)
- {
- sessionCompleted(copy, options);
- }
+ sessionCompleted(copy, options);
}
}
@@ -694,12 +661,7 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
-
- boolean replayTransfer = !closing && !transacted &&
- m instanceof MessageTransfer &&
- ! m.isUnreliable();
-
- if ((replayTransfer) || m.hasCompletionListener())
+ if ((!closing && !transacted && m instanceof MessageTransfer) || m.hasCompletionListener())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
@@ -1033,8 +995,7 @@ public class Session extends SessionInvoker
if(state == CLOSED)
{
- connection.removeSession(this);
- listener.closed(this);
+ connection.removeSession(this);
}
}
@@ -1047,54 +1008,13 @@ public class Session extends SessionInvoker
{
return String.format("ssn:%s", name);
}
-
+
public void setTransacted(boolean b) {
this.transacted = b;
}
-
+
public boolean isTransacted(){
return transacted;
}
-
- public void setDetachCode(SessionDetachCode dtc)
- {
- this.detachCode = dtc;
- }
-
- public SessionDetachCode getDetachCode()
- {
- return this.detachCode;
- }
-
- public void awaitOpen()
- {
- switch (state)
- {
- case NEW:
- synchronized(stateLock)
- {
- Waiter w = new Waiter(stateLock, timeout);
- while (w.hasTime() && state == NEW)
- {
- w.await();
- }
- }
-
- if (state != OPEN)
- {
- throw new SessionException("Timed out waiting for Session to open");
- }
- case DETACHED:
- case CLOSING:
- case CLOSED:
- throw new SessionException("Session closed");
- default :
- break;
- }
- }
-
- public Object getStateLock()
- {
- return stateLock;
- }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 3341149e5f..5d8e4d5565 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -76,10 +76,6 @@ public class SessionDelegate
@Override public void sessionAttached(Session ssn, SessionAttached atc)
{
ssn.setState(Session.State.OPEN);
- synchronized (ssn.getStateLock())
- {
- ssn.getStateLock().notifyAll();
- }
}
@Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -206,19 +202,11 @@ public class SessionDelegate
public void closed(Session session)
{
- log.debug("CLOSED: [%s]", session);
- synchronized (session.getStateLock())
- {
- session.getStateLock().notifyAll();
- }
+ log.warn("CLOSED: [%s]", session);
}
public void detached(Session session)
{
- log.debug("DETACHED: [%s]", session);
- synchronized (session.getStateLock())
- {
- session.getStateLock().notifyAll();
- }
+ log.warn("DETACHED: [%s]", session);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java
deleted file mode 100644
index 2c7652abeb..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.qpid.transport;
-
-import org.apache.mina.common.IoConnector;
-
-public interface SocketConnectorFactory
-{
- IoConnector newConnector();
-} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
new file mode 100644
index 0000000000..c08909c6e4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+
+public class TransportBuilder
+{
+ private Connection con;
+ private ConnectionSettings settings;
+ private NetworkTransport transport;
+ private SecurityLayer securityLayer = new SecurityLayer();
+
+ public void init(Connection con) throws TransportException
+ {
+ this.con = con;
+ this.settings = con.getConnectionSettings();
+ transport = Transport.getTransport();
+ transport.init(settings);
+ securityLayer.init(con);
+ }
+
+ public Sender<ProtocolEvent> buildSenderPipe()
+ {
+ ConnectionSettings settings = con.getConnectionSettings();
+
+ // Io layer
+ Sender<ByteBuffer> sender = transport.sender();
+
+ // Security layer
+ sender = securityLayer.sender(sender);
+
+ Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize());
+ return dis;
+ }
+
+ public void buildReceiverPipe(Receiver<ProtocolEvent> delegate)
+ {
+ Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate));
+
+ // Security layer
+ receiver = securityLayer.receiver(receiver);
+
+ //Io layer
+ transport.receiver(receiver);
+ }
+
+ public SecurityLayer getSecurityLayer()
+ {
+ return securityLayer;
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index 0ccfcfcb70..908d14a307 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -63,7 +63,6 @@ abstract class AbstractEncoder implements Encoder
ENCODINGS.put(Double.class, Type.DOUBLE);
ENCODINGS.put(Character.class, Type.CHAR);
ENCODINGS.put(byte[].class, Type.VBIN32);
- ENCODINGS.put(UUID.class, Type.UUID);
}
private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
deleted file mode 100644
index 7099916c33..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network;
-
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-
-public interface IncomingNetworkTransport extends NetworkTransport
-{
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory);
-} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
index 4610c2351e..5e12d7e7c6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
@@ -20,13 +20,19 @@
*/
package org.apache.qpid.transport.network;
-/**
- * A network transport is responsible for the establishment of network connections.
- * NetworkTransport implementations are pluggable via the {@link Transport} class.
- */
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.ConnectionSettings;
+
public interface NetworkTransport
{
+ public void init(ConnectionSettings settings);
+
+ public Sender<ByteBuffer> sender();
+
+ public void receiver(Receiver<ByteBuffer> delegate);
+
public void close();
-
- public NetworkConnection getConnection();
-}
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index 2c10a30f10..f0bf04d04f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,129 +7,50 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
-package org.apache.qpid.transport.network;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+package org.apache.qpid.transport.network;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.transport.TransportException;
public class Transport
-{
- public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport";
- public static final String QPID_TRANSPORT_V0_8_PROPNAME = "qpid.transport.v0_8";
- public static final String QPID_TRANSPORT_V0_9_PROPNAME = "qpid.transport.v0_9";
- public static final String QPID_TRANSPORT_V0_9_1_PROPNAME = "qpid.transport.v0_9_1";
- public static final String QPID_TRANSPORT_V0_10_PROPNAME = "qpid.transport.v0_10";
- public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport";
-
- // Can't reference the class directly here, as this would preclude the ability to bundle transports separately.
- private static final String MINA_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.mina.MinaNetworkTransport";
- private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport";
-
- public static final String TCP = "tcp";
-
- private final static Map<ProtocolVersion,String> OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP;
-
- static
- {
- final Map<ProtocolVersion,String> map = new HashMap<ProtocolVersion, String>();
- map.put(ProtocolVersion.v8_0, MINA_TRANSPORT_CLASSNAME);
- map.put(ProtocolVersion.v0_9, MINA_TRANSPORT_CLASSNAME);
- map.put(ProtocolVersion.v0_91, MINA_TRANSPORT_CLASSNAME);
- map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME);
-
- OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
- }
-
- public static IncomingNetworkTransport getIncomingTransportInstance()
+{
+ private final static Class<?> transportClass;
+
+ static
{
- return (IncomingNetworkTransport) loadTransportClass(
- System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, MINA_TRANSPORT_CLASSNAME));
- }
-
- public static OutgoingNetworkTransport getOutgoingTransportInstance(
- final ProtocolVersion protocolVersion)
- {
-
- final String overrride = getOverrideClassNameFromSystemProperty(protocolVersion);
- final String networkTransportClassName;
- if (overrride != null)
- {
- networkTransportClassName = overrride;
- }
- else
- {
- networkTransportClassName = OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP.get(protocolVersion);
- }
-
- return (OutgoingNetworkTransport) loadTransportClass(networkTransportClassName);
- }
-
- private static NetworkTransport loadTransportClass(final String networkTransportClassName)
- {
- if (networkTransportClassName == null)
- {
- throw new IllegalArgumentException("transport class name must not be null");
- }
-
try
{
- final Class<?> clazz = Class.forName(networkTransportClassName);
- return (NetworkTransport) clazz.newInstance();
+ transportClass =
+ Class.forName(System.getProperty("qpid.transport",
+ "org.apache.qpid.transport.network.io.IoNetworkTransport"));
+
}
- catch (InstantiationException e)
+ catch(Exception e)
{
- throw new TransportException("Unable to instantiate transport class " + networkTransportClassName, e);
- }
- catch (IllegalAccessException e)
- {
- throw new TransportException("Access exception " + networkTransportClassName, e);
- }
- catch (ClassNotFoundException e)
- {
- throw new TransportException("Unable to load transport class " + networkTransportClassName, e);
+ throw new Error("Error occured while loading Qpid Transport",e);
}
}
-
- private static String getOverrideClassNameFromSystemProperty(final ProtocolVersion protocolVersion)
+
+ public static NetworkTransport getTransport() throws TransportException
{
- final String protocolSpecificSystemProperty;
-
- if (ProtocolVersion.v0_10.equals(protocolVersion))
- {
- protocolSpecificSystemProperty = QPID_TRANSPORT_V0_10_PROPNAME;
- }
- else if (ProtocolVersion.v0_91.equals(protocolVersion))
- {
- protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_1_PROPNAME;
- }
- else if (ProtocolVersion.v0_9.equals(protocolVersion))
- {
- protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_PROPNAME;
- }
- else if (ProtocolVersion.v8_0.equals(protocolVersion))
+ try
{
- protocolSpecificSystemProperty = QPID_TRANSPORT_V0_8_PROPNAME;
+ return (NetworkTransport)transportClass.newInstance();
}
- else
+ catch (Exception e)
{
- throw new IllegalArgumentException("Unknown ProtocolVersion " + protocolVersion);
+ throw new TransportException("Error while creating a new transport instance",e);
}
-
- return System.getProperty(protocolSpecificSystemProperty, System.getProperty(QPID_TRANSPORT_PROPNAME));
}
-}
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
new file mode 100644
index 0000000000..ecc5f6d07c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
@@ -0,0 +1,130 @@
+package org.apache.qpid.transport.network.io;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentBodyFactory;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderBodyFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.HeartbeatBodyFactory;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Receiver;
+
+public class InputHandler_0_9 implements Receiver<ByteBuffer>
+{
+
+ private AMQVersionAwareProtocolSession _session;
+ private MethodRegistry _registry;
+ private BodyFactory bodyFactory;
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+ static
+ {
+ _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
+ public InputHandler_0_9(AMQVersionAwareProtocolSession session)
+ {
+ _session = session;
+ _registry = _session.getMethodRegistry();
+ }
+
+ public void closed()
+ {
+ // AS FIXME: implement
+ }
+
+ public void exception(Throwable t)
+ {
+ // TODO: propogate exception to things
+ t.printStackTrace();
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf);
+ try
+ {
+ final byte type = in.get();
+ if (type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = new AMQMethodBodyFactory(_session);
+ }
+ else
+ {
+ bodyFactory = _bodiesSupported[type];
+ }
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ }
+
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize, null);
+ }
+
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type, null);
+ }
+
+ try
+ {
+ frame.getBodyFrame().handle(frame.getChannel(), _session);
+ }
+ catch (AMQException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
new file mode 100644
index 0000000000..8530240dcc
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.TransportException;
+
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * IoAcceptor
+ *
+ */
+
+public class IoAcceptor<E> extends Thread
+{
+
+
+ private ServerSocket socket;
+ private Binding<E,ByteBuffer> binding;
+
+ public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ socket = new ServerSocket();
+ socket.setReuseAddress(true);
+ socket.bind(address);
+ this.binding = binding;
+
+ setName(String.format("IoAcceptor - %s", socket.getInetAddress()));
+ }
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close() throws IOException
+ {
+ if (!socket.isClosed())
+ {
+ socket.close();
+ }
+ }
+
+ public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ this(new InetSocketAddress(host, port), binding);
+ }
+
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ Socket sock = socket.accept();
+ IoTransport<E> transport = new IoTransport<E>(sock, binding,false);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
index ff86ba481f..69b3a0ce45 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,26 +7,29 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
-package org.apache.qpid.transport.network;
+package org.apache.qpid.transport.network.io;
+import java.net.Socket;
import java.nio.ByteBuffer;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
-public interface OutgoingNetworkTransport extends NetworkTransport
+public interface IoContext
{
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory);
-} \ No newline at end of file
+ Sender<ByteBuffer> getSender();
+
+ IoReceiver getReceiver();
+
+ Socket getSocket();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
deleted file mode 100644
index cca1fc46c9..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.io;
-
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IoNetworkConnection implements NetworkConnection
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkConnection.class);
- private final Socket _socket;
- private final long _timeout;
- private final IoSender _ioSender;
- private final IoReceiver _ioReceiver;
-
- public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
- int sendBufferSize, int receiveBufferSize, long timeout)
- {
- _socket = socket;
- _timeout = timeout;
-
- _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
- _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
- _ioSender.registerCloseListener(_ioReceiver);
-
- _ioReceiver.initiate();
- _ioSender.initiate();
- }
-
- public Sender<ByteBuffer> getSender()
- {
- return _ioSender;
- }
-
- public void close()
- {
- try
- {
- _ioSender.close();
- }
- finally
- {
- _ioReceiver.close(false);
- }
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _socket.getRemoteSocketAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _socket.getLocalSocketAddress();
- }
-
- public void setMaxWriteIdle(int sec)
- {
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
- }
-
- public void setMaxReadIdle(int sec)
- {
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index d611ab1cf3..dd6a37eca2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -27,15 +27,14 @@ import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
-import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkTransport;
import org.apache.qpid.transport.util.Logger;
-public class IoNetworkTransport implements OutgoingNetworkTransport
+public class IoNetworkTransport implements NetworkTransport, IoContext
{
static
{
@@ -45,31 +44,34 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
(Boolean.getBoolean("amqj.enableDirectBuffers"));
}
- private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
+ private static final Logger log = Logger.get(IoNetworkTransport.class);
- private Socket _socket;
- private IoNetworkConnection _connection;
- private long _timeout = 60000;
+ private Socket socket;
+ private Sender<ByteBuffer> sender;
+ private IoReceiver receiver;
+ private long timeout = 60000;
+ private ConnectionSettings settings;
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
+ public void init(ConnectionSettings settings)
{
- int sendBufferSize = settings.getWriteBufferSize();
- int receiveBufferSize = settings.getReadBufferSize();
-
try
{
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(settings.isTcpNodelay());
- _socket.setSendBufferSize(sendBufferSize);
- _socket.setReceiveBufferSize(receiveBufferSize);
+ this.settings = settings;
+ InetAddress address = InetAddress.getByName(settings.getHost());
+ socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(settings.isTcpNodelay());
- LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
+ log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
- InetAddress address = InetAddress.getByName(settings.getHost());
+ socket.setSendBufferSize(settings.getWriteBufferSize());
+ socket.setReceiveBufferSize(settings.getReadBufferSize());
- _socket.connect(new InetSocketAddress(address, settings.getPort()));
+ log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.connect(new InetSocketAddress(address, settings.getPort()));
}
catch (SocketException e)
{
@@ -79,35 +81,36 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
{
throw new TransportException("Error connecting to broker", e);
}
+ }
- try
- {
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
- }
- catch(Exception e)
- {
- try
- {
- _socket.close();
- }
- catch(IOException ioe)
- {
- //ignored, throw based on original exception
- }
-
- throw new TransportException("Error creating network connection", e);
- }
+ public void receiver(Receiver<ByteBuffer> delegate)
+ {
+ receiver = new IoReceiver(this, delegate,
+ 2*settings.getReadBufferSize() , timeout);
+ }
- return _connection;
+ public Sender<ByteBuffer> sender()
+ {
+ return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
}
public void close()
{
- _connection.close();
+
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return sender;
+ }
+
+ public IoReceiver getReceiver()
+ {
+ return receiver;
}
- public NetworkConnection getConnection()
+ public Socket getSocket()
{
- return _connection;
+ return socket;
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index fea87fc350..19a683d505 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.transport.network.io;
-import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
@@ -38,54 +37,43 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
-final class IoReceiver implements Runnable, Closeable
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
+ private final IoContext ioCtx;
private final Receiver<ByteBuffer> receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
- private static final boolean shutdownBroken;
- static
- {
- String osName = System.getProperty("os.name");
- shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*");
- }
+ private final boolean shutdownBroken =
+ ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
- public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+ public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
+ int bufferSize, long timeout)
{
+ this.ioCtx = ioCtx;
this.receiver = receiver;
this.bufferSize = bufferSize;
- this.socket = socket;
+ this.socket = ioCtx.getSocket();
this.timeout = timeout;
try
{
- //Create but deliberately don't start the thread.
receiverThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
- throw new RuntimeException("Error creating IOReceiver thread",e);
+ throw new Error("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
- }
-
- public void initiate()
- {
receiverThread.start();
}
- public void close()
- {
- close(false);
- }
-
void close(boolean block)
{
if (!closed.getAndSet(true))
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 1bb515624c..66b97e8225 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -24,11 +24,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
@@ -46,6 +43,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
// we can test other cases as well
private final static int START = Integer.MAX_VALUE - 10;
+ private final IoContext ioCtx;
private final long timeout;
private final Socket socket;
private final OutputStream out;
@@ -58,13 +56,14 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
- private final List<Closeable> _listeners = new ArrayList<Closeable>();
private volatile Throwable exception = null;
- public IoSender(Socket socket, int bufferSize, long timeout)
+
+ public IoSender(IoContext ioCtx, int bufferSize, long timeout)
{
- this.socket = socket;
+ this.ioCtx = ioCtx;
+ this.socket = ioCtx.getSocket();
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
this.timeout = timeout;
@@ -79,7 +78,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
try
{
- //Create but deliberately don't start the thread.
senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
@@ -89,10 +87,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
- }
-
- public void initiate()
- {
senderThread.start();
}
@@ -210,20 +204,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
senderThread.join(timeout);
if (senderThread.isAlive())
{
- log.error("join timed out");
throw new SenderException("join timed out");
}
}
+ ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
- log.error("interrupted whilst waiting for sender thread to stop");
throw new SenderException(e);
}
- finally
- {
- closeListeners();
- }
+
if (reportException && exception != null)
{
throw new SenderException(exception);
@@ -231,28 +221,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
- private void closeListeners()
- {
- Exception ex = null;
- for(Closeable listener : _listeners)
- {
- try
- {
- listener.close();
- }
- catch(Exception e)
- {
- log.error("Exception closing listener: " + e.getMessage());
- ex = e;
- }
- }
-
- if (ex != null)
- {
- throw new SenderException(ex.getMessage(), ex);
- }
- }
-
public void run()
{
final int size = buffer.length;
@@ -336,9 +304,4 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
throw new SenderException(e);
}
}
-
- public void registerCloseListener(Closeable listener)
- {
- _listeners.add(listener);
- }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
new file mode 100644
index 0000000000..bfdbb34978
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
+import org.apache.qpid.transport.network.security.ssl.SSLSender;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * This class provides a socket based transport using the java.io
+ * classes.
+ *
+ * The following params are configurable via JVM arguments
+ * TCP_NO_DELAY - amqj.tcpNoDelay
+ * SO_RCVBUF - amqj.receiveBufferSize
+ * SO_SNDBUF - amqj.sendBufferSize
+ */
+public final class IoTransport<E> implements IoContext
+{
+
+ static
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator
+ (new org.apache.mina.common.SimpleByteBufferAllocator());
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+ (Boolean.getBoolean("amqj.enableDirectBuffers"));
+ }
+
+ private static final Logger log = Logger.get(IoTransport.class);
+
+ private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+ private static int readBufferSize = Integer.getInteger
+ ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
+ private static int writeBufferSize = Integer.getInteger
+ ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
+
+ private Socket socket;
+ private Sender<ByteBuffer> sender;
+ private E endpoint;
+ private IoReceiver receiver;
+ private long timeout = 60000;
+
+ IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
+ {
+ this.socket = socket;
+
+ if (ssl)
+ {
+ SSLEngine engine = null;
+ SSLContext sslCtx;
+ try
+ {
+ sslCtx = createSSLContext();
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ try
+ {
+ engine = sslCtx.createSSLEngine();
+ engine.setUseClientMode(true);
+ }
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+
+ this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout));
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
+ 2*readBufferSize, timeout);
+
+ log.info("SSL Sender and Receiver initiated");
+ }
+ else
+ {
+ this.sender = new IoSender(this, 2*writeBufferSize, timeout);
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(this, binding.receiver(endpoint),
+ 2*readBufferSize, timeout);
+ }
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return sender;
+ }
+
+ public IoReceiver getReceiver()
+ {
+ return receiver;
+ }
+
+ public Socket getSocket()
+ {
+ return socket;
+ }
+
+ public static final <E> E connect(String host, int port,
+ Binding<E,ByteBuffer> binding,
+ boolean ssl)
+ {
+ Socket socket = createSocket(host, port);
+ IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl);
+ return transport.endpoint;
+ }
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate,
+ boolean ssl)
+ {
+ return connect(host, port, ConnectionBinding.get(delegate),ssl);
+ }
+
+ public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port, boolean ssl)
+ {
+ connect(host, port, new Binding_0_9(session),ssl);
+ }
+
+ private static class Binding_0_9
+ implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
+ {
+
+ private AMQVersionAwareProtocolSession session;
+
+ Binding_0_9(AMQVersionAwareProtocolSession session)
+ {
+ this.session = session;
+ }
+
+ public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender)
+ {
+ session.setSender(sender);
+ return session;
+ }
+
+ public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn)
+ {
+ return new InputHandler_0_9(ssn);
+ }
+
+ }
+
+ private static Socket createSocket(String host, int port)
+ {
+ try
+ {
+ InetAddress address = InetAddress.getByName(host);
+ Socket socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+
+ log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.setSendBufferSize(writeBufferSize);
+ socket.setReceiveBufferSize(readBufferSize);
+
+ log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.connect(new InetSocketAddress(address, port));
+ return socket;
+ }
+ catch (SocketException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ }
+
+ private SSLContext createSSLContext() throws Exception
+ {
+ String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
+ String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
+ String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");
+
+ String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath);
+ String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword);
+ String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");
+
+ SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword,
+ trustStoreCertType,keyStorePath,
+ keyStorePassword,keyStoreCertType);
+
+ return sslContextFactory.buildServerContext();
+
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
new file mode 100644
index 0000000000..0f2c0d0226
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -0,0 +1,435 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExecutorThreadModel;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.SessionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.OpenException;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
+{
+
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ ProtocolEngine _protocolEngine;
+ private boolean _useNIO = false;
+ private int _processors = 4;
+ private boolean _executorPool = false;
+ private SSLContextFactory _sslFactory = null;
+ private IoConnector _socketConnector;
+ private IoAcceptor _acceptor;
+ private IoSession _ioSession;
+ private ProtocolEngineFactory _factory;
+ private boolean _protectIO;
+ private NetworkDriverConfiguration _config;
+ private Throwable _lastException;
+ private boolean _acceptingConnections = false;
+
+ private WriteFuture _lastWriteFuture;
+
+ private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class);
+
+ static
+ {
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+
+ //override the MINA defaults to prevent use of the PooledByteBufferAllocator
+ org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ }
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO,
+ ProtocolEngine protocolEngine, IoSession session)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ _protocolEngine = protocolEngine;
+ _ioSession = session;
+ _ioSession.setAttachment(_protocolEngine);
+ }
+
+ public MINANetworkDriver()
+ {
+
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector)
+ {
+ _socketConnector = ioConnector;
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
+ {
+ _socketConnector = ioConnector;
+ _protocolEngine = engine;
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ {
+
+ _factory = factory;
+ _config = config;
+
+ if (_useNIO)
+ {
+ _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
+ new NewThreadExecutor());
+ }
+ else
+ {
+ _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
+ }
+
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+ sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)"));
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+
+ if (config != null)
+ {
+ sc.setReceiveBufferSize(config.getReceiveBufferSize());
+ sc.setSendBufferSize(config.getSendBufferSize());
+ sc.setTcpNoDelay(config.getTcpNoDelay());
+ }
+
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
+ if (addresses != null && addresses.length > 0)
+ {
+ for (InetAddress addr : addresses)
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to %1s:%2s", addr, port));
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(port), this, sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to *:%1s", port));
+ }
+ }
+ _acceptingConnections = true;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _ioSession.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _ioSession.getLocalAddress();
+ }
+
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ SSLContextFactory sslFactory) throws OpenException
+ {
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
+ if (_useNIO)
+ {
+ _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
+ }
+ else
+ {
+ _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking
+ // connector
+ }
+
+ SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
+ String s = "";
+ StackTraceElement[] trace = Thread.currentThread().getStackTrace();
+ for(StackTraceElement elt : trace)
+ {
+ if(elt.getClassName().contains("Test"))
+ {
+ s = elt.getClassName();
+ break;
+ }
+ }
+ cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s));
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
+ scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE);
+ scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
+
+ // Don't have the connector's worker thread wait around for other
+ // connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows
+ // short-running
+ // clients (like unit tests) to complete quickly.
+ if (_socketConnector instanceof SocketConnector)
+ {
+ ((SocketConnector) _socketConnector).setWorkerTimeout(0);
+ }
+
+ ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
+ future.join();
+ if (!future.isConnected())
+ {
+ throw new OpenException("Could not open connection", _lastException);
+ }
+ _ioSession = future.getSession();
+ _ioSession.setAttachment(engine);
+ engine.setNetworkDriver(this);
+ _protocolEngine = engine;
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime);
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
+ }
+
+ public void close()
+ {
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join();
+ }
+ if (_acceptor != null)
+ {
+ _acceptor.unbindAll();
+ }
+ if (_ioSession != null)
+ {
+ _ioSession.close();
+ }
+ }
+
+ public void flush()
+ {
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join();
+ }
+ }
+
+ public void send(ByteBuffer msg)
+ {
+ org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity());
+ minaBuf.put(msg);
+ minaBuf.flip();
+ _lastWriteFuture = _ioSession.write(minaBuf);
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ // MINA doesn't support setting SO_TIMEOUT
+ }
+
+ public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
+ {
+ if (_protocolEngine != null)
+ {
+ _protocolEngine.exception(throwable);
+ }
+ else
+ {
+ _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable);
+ }
+ _lastException = throwable;
+ }
+
+ /**
+ * Invoked when a message is received on a particular protocol session. Note
+ * that a protocol session is directly tied to a particular physical
+ * connection.
+ *
+ * @param protocolSession
+ * the protocol session that received the message
+ * @param message
+ * the message itself (i.e. a decoded frame)
+ *
+ * @throws Exception
+ * if the message cannot be processed
+ */
+ public void messageReceived(IoSession protocolSession, Object message) throws Exception
+ {
+ if (message instanceof org.apache.mina.common.ByteBuffer)
+ {
+ ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf());
+ }
+ else
+ {
+ throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
+ }
+ }
+
+ public void sessionClosed(IoSession protocolSession) throws Exception
+ {
+ ((ProtocolEngine) protocolSession.getAttachment()).closed();
+ }
+
+ public void sessionCreated(IoSession protocolSession) throws Exception
+ {
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
+ {
+ if (_sslFactory != null)
+ {
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+ else
+ {
+ if (_sslFactory != null)
+ {
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
+
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
+
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
+
+ if (_ioSession == null)
+ {
+ _ioSession = protocolSession;
+ }
+
+ if (_acceptingConnections)
+ {
+ // Set up the protocol engine
+ ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
+ MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
+ protocolEngine.setNetworkDriver(newDriver);
+ }
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).writerIdle();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).readerIdle();
+ }
+ }
+
+ private ProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
+ public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections)
+ {
+ _factory = engineFactory;
+ _acceptingConnections = acceptingConnections;
+ }
+
+ public void setProtocolEngine(ProtocolEngine protocolEngine)
+ {
+ _protocolEngine = protocolEngine;
+ if (_ioSession != null)
+ {
+ _ioSession.setAttachment(protocolEngine);
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
new file mode 100644
index 0000000000..b89eed48b0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
@@ -0,0 +1,274 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.mina;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.*;
+
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.ConnectionBinding;
+
+import org.apache.qpid.transport.util.Logger;
+
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
+/**
+ * MinaHandler
+ *
+ * @author Rafael H. Schloming
+ */
+//RA making this public until we sort out the package issues
+public class MinaHandler<E> implements IoHandler
+{
+ /** Default buffer size for pending messages reads */
+ private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
+ /** Default buffer size for pending messages writes */
+ private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
+ private static final int MAX_RCVBUF = 64*1024;
+
+ private static final Logger log = Logger.get(MinaHandler.class);
+
+ static
+ {
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+ }
+
+ private final Binding<E,java.nio.ByteBuffer> binding;
+
+ private MinaHandler(Binding<E,java.nio.ByteBuffer> binding)
+ {
+ this.binding = binding;
+ }
+
+ public void messageReceived(IoSession ssn, Object obj)
+ {
+ Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
+ ByteBuffer buf = (ByteBuffer) obj;
+ try
+ {
+ attachment.receiver.received(buf.buf());
+ }
+ catch (Throwable t)
+ {
+ log.error(t, "exception handling buffer %s", str(buf.buf()));
+ throw new RuntimeException(t);
+ }
+ }
+
+ public void messageSent(IoSession ssn, Object obj)
+ {
+ // do nothing
+ }
+
+ public void exceptionCaught(IoSession ssn, Throwable e)
+ {
+ Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
+ attachment.receiver.exception(e);
+ }
+
+ /**
+ * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
+ * session, which filters the events handled by this handler. The filter chain consists of, handing off events
+ * to an optional protectio
+ *
+ * @param session The MINA session.
+ * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
+ */
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ log.debug("Protocol session created for session " + System.identityHashCode(session));
+
+ if (Boolean.getBoolean("protectio"))
+ {
+ try
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = session.getFilterChain();
+
+ session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(
+ Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(
+ Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
+ writefilter.attach(chain);
+ session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+
+ log.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
+ }
+
+ public void sessionOpened(final IoSession ssn)
+ {
+ log.debug("opened: %s", this);
+ E endpoint = binding.endpoint(new MinaSender(ssn));
+ Attachment<E> attachment =
+ new Attachment<E>(endpoint, binding.receiver(endpoint));
+
+ // We need to synchronize and notify here because the MINA
+ // connect future returns the session prior to the attachment
+ // being set. This is arguably a bug in MINA.
+ synchronized (ssn)
+ {
+ ssn.setAttachment(attachment);
+ ssn.notifyAll();
+ }
+ }
+
+ public void sessionClosed(IoSession ssn)
+ {
+ log.debug("closed: %s", ssn);
+ Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
+ attachment.receiver.closed();
+ ssn.setAttachment(null);
+ }
+
+ public void sessionIdle(IoSession ssn, IdleStatus status)
+ {
+ // do nothing
+ }
+
+ private static class Attachment<E>
+ {
+
+ E endpoint;
+ Receiver<java.nio.ByteBuffer> receiver;
+
+ Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver)
+ {
+ this.endpoint = endpoint;
+ this.receiver = receiver;
+ }
+ }
+
+ public static final void accept(String host, int port,
+ Binding<?,java.nio.ByteBuffer> binding)
+ throws IOException
+ {
+ accept(new InetSocketAddress(host, port), binding);
+ }
+
+ public static final <E> void accept(SocketAddress address,
+ Binding<E,java.nio.ByteBuffer> binding)
+ throws IOException
+ {
+ IoAcceptor acceptor = new SocketAcceptor();
+ acceptor.bind(address, new MinaHandler<E>(binding));
+ }
+
+ public static final <E> E connect(String host, int port,
+ Binding<E,java.nio.ByteBuffer> binding)
+ {
+ return connect(new InetSocketAddress(host, port), binding);
+ }
+
+ public static final <E> E connect(SocketAddress address,
+ Binding<E,java.nio.ByteBuffer> binding)
+ {
+ MinaHandler<E> handler = new MinaHandler<E>(binding);
+ SocketConnector connector = new SocketConnector();
+ IoServiceConfig acceptorConfig = connector.getDefaultConfig();
+ acceptorConfig.setThreadModel(ThreadModel.MANUAL);
+ SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig();
+ scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+ Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize");
+ if (sendBufferSize != null && sendBufferSize > 0)
+ {
+ scfg.setSendBufferSize(sendBufferSize);
+ }
+ Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize");
+ if (receiveBufferSize != null && receiveBufferSize > 0)
+ {
+ scfg.setReceiveBufferSize(receiveBufferSize);
+ }
+ else if (scfg.getReceiveBufferSize() > MAX_RCVBUF)
+ {
+ scfg.setReceiveBufferSize(MAX_RCVBUF);
+ }
+ connector.setWorkerTimeout(0);
+ ConnectFuture cf = connector.connect(address, handler);
+ cf.join();
+ IoSession ssn = cf.getSession();
+
+ // We need to synchronize and wait here because the MINA
+ // connect future returns the session prior to the attachment
+ // being set. This is arguably a bug in MINA.
+ synchronized (ssn)
+ {
+ while (ssn.getAttachment() == null)
+ {
+ try
+ {
+ ssn.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
+ return attachment.endpoint;
+ }
+
+ public static final void accept(String host, int port,
+ ConnectionDelegate delegate)
+ throws IOException
+ {
+ accept(host, port, ConnectionBinding.get(delegate));
+ }
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ return connect(host, port, ConnectionBinding.get(delegate));
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
deleted file mode 100644
index 0f433f6eeb..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.mina;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class MinaNetworkConnection implements NetworkConnection
-{
- private IoSession _session;
- private Sender<ByteBuffer> _sender;
-
- public MinaNetworkConnection(IoSession session)
- {
- _session = session;
- _sender = new MinaSender(_session);
- }
-
- public Sender<ByteBuffer> getSender()
- {
- return _sender;
- }
-
- public void close()
- {
- _session.close();
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _session.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _session.getLocalAddress();
- }
-
- public long getReadBytes()
- {
- return _session.getReadBytes();
- }
-
- public long getWrittenBytes()
- {
- return _session.getWrittenBytes();
- }
-
- public void setMaxWriteIdle(int sec)
- {
- _session.setIdleTime(IdleStatus.WRITER_IDLE, sec);
- }
-
- public void setMaxReadIdle(int sec)
- {
- _session.setIdleTime(IdleStatus.READER_IDLE, sec);
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
deleted file mode 100644
index c00187480c..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.transport.network.mina;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.util.SessionUtil;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MinaNetworkHandler extends IoHandlerAdapter
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class);
-
- private ProtocolEngineFactory _factory;
- private SSLContextFactory _sslFactory = null;
-
- static
- {
- boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers");
- LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers");
- ByteBuffer.setUseDirectBuffers(directBuffers);
-
- //override the MINA defaults to prevent use of the PooledByteBufferAllocator
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- }
-
- public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory)
- {
- _sslFactory = sslFactory;
- _factory = factory;
- }
-
- public MinaNetworkHandler(SSLContextFactory sslFactory)
- {
- this(sslFactory, null);
- }
-
- public void messageReceived(IoSession session, Object message)
- {
- ProtocolEngine engine = (ProtocolEngine) session.getAttachment();
- ByteBuffer buf = (ByteBuffer) message;
- try
- {
- engine.received(buf.buf());
- }
- catch (RuntimeException re)
- {
- engine.exception(re);
- }
- }
-
- public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception
- {
- ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
- if(engine != null)
- {
- LOGGER.error("Exception caught by Mina", throwable);
- engine.exception(throwable);
- }
- else
- {
- LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable);
- }
- }
-
- public void sessionCreated(IoSession ioSession) throws Exception
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Created session: " + ioSession.getRemoteAddress());
- }
-
- SessionUtil.initialize(ioSession);
-
- if (_sslFactory != null)
- {
- ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
-
- if (_factory != null)
- {
- NetworkConnection netConn = new MinaNetworkConnection(ioSession);
-
- ProtocolEngine engine = _factory.newProtocolEngine(netConn);
- ioSession.setAttachment(engine);
- }
- }
-
- public void sessionClosed(IoSession ioSession) throws Exception
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("closed: " + ioSession.getRemoteAddress());
- }
-
- ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment();
- if(engine != null)
- {
- engine.closed();
- }
- else
- {
- LOGGER.error("Unable to close ProtocolEngine as none was present");
- }
- }
-
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).writerIdle();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- ((ProtocolEngine) session.getAttachment()).readerIdle();
- }
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
deleted file mode 100644
index d0367b82f4..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.mina;
-
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExecutorThreadModel;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
-
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.thread.QpidThreadExecutor;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SocketConnectorFactory;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
-{
- private static final int UNKNOWN = -1;
- private static final int TCP = 0;
-
- public NetworkConnection _connection;
- private SocketAcceptor _acceptor;
- private InetSocketAddress _address;
-
- public NetworkConnection connect(ConnectionSettings settings,
- Receiver<java.nio.ByteBuffer> delegate, SSLContextFactory sslFactory)
- {
- int transport = getTransport(settings.getProtocol());
-
- IoConnectorCreator stc;
- switch(transport)
- {
- case TCP:
- stc = new IoConnectorCreator(new SocketConnectorFactory()
- {
- public IoConnector newConnector()
- {
- return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
- }
- });
- _connection = stc.connect(delegate, settings, sslFactory);
- break;
- case UNKNOWN:
- default:
- throw new TransportException("Unknown protocol: " + settings.getProtocol());
- }
-
- return _connection;
- }
-
- private static int getTransport(String transport)
- {
- if (transport.equals(Transport.TCP))
- {
- return TCP;
- }
-
- return UNKNOWN;
- }
-
- public void close()
- {
- if(_connection != null)
- {
- _connection.close();
- }
- if (_acceptor != null)
- {
- _acceptor.unbindAll();
- }
- }
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-
- public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory,
- final SSLContextFactory sslFactory)
- {
- int processors = config.getConnectorProcessors();
-
- if (Transport.TCP.equalsIgnoreCase(config.getTransport()))
- {
- _acceptor = new SocketAcceptor(processors, new NewThreadExecutor());
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
- sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)"));
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
- sc.setTcpNoDelay(config.getTcpNoDelay());
- sc.setSendBufferSize(config.getSendBufferSize());
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
-
- if (config.getHost().equals(WILDCARD_ADDRESS))
- {
- _address = new InetSocketAddress(config.getPort());
- }
- else
- {
- _address = new InetSocketAddress(config.getHost(), config.getPort());
- }
- }
- else
- {
- throw new TransportException("Unknown transport: " + config.getTransport());
- }
-
- try
- {
- _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory));
- }
- catch (IOException e)
- {
- throw new TransportException("Could not bind to " + _address, e);
- }
- }
-
-
- private static class IoConnectorCreator
- {
- private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class);
-
- private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024;
-
- private SocketConnectorFactory _ioConnectorFactory;
-
- public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory)
- {
- _ioConnectorFactory = socketConnectorFactory;
- }
-
- public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings settings, SSLContextFactory sslFactory)
- {
- final IoConnector ioConnector = _ioConnectorFactory.newConnector();
- final SocketAddress address;
- final String protocol = settings.getProtocol();
- final int port = settings.getPort();
-
- if (Transport.TCP.equalsIgnoreCase(protocol))
- {
- address = new InetSocketAddress(settings.getHost(), port);
- }
- else
- {
- throw new TransportException("Unknown transport: " + protocol);
- }
-
- LOGGER.info("Attempting connection to " + address);
-
- if (ioConnector instanceof SocketConnector)
- {
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)"));
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay(true);
- scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
- scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE);
-
- // Don't have the connector's worker thread wait around for other
- // connections (we only use one SocketConnector per connection
- // at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- ((SocketConnector) ioConnector).setWorkerTimeout(0);
- }
-
- ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory), ioConnector.getDefaultConfig());
- future.join();
- if (!future.isConnected())
- {
- throw new TransportException("Could not open connection");
- }
-
- IoSession session = future.getSession();
- session.setAttachment(receiver);
-
- return new MinaNetworkConnection(session);
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
index be114e2fa1..22b9c5e784 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
@@ -25,55 +25,66 @@ import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
import org.apache.qpid.transport.Sender;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.transport.TransportException;
+
/**
* MinaSender
*/
+
public class MinaSender implements Sender<java.nio.ByteBuffer>
{
- private static final Logger _log = LoggerFactory.getLogger(MinaSender.class);
-
- private final IoSession _session;
- private WriteFuture _lastWrite;
+ private static final int TIMEOUT = 2 * 60 * 1000;
+
+ private final IoSession session;
+ private WriteFuture lastWrite = null;
public MinaSender(IoSession session)
{
- _session = session;
+ this.session = session;
}
- public synchronized void send(java.nio.ByteBuffer msg)
+ public void send(java.nio.ByteBuffer buf)
{
- _log.debug("sending data:");
- ByteBuffer mina = ByteBuffer.allocate(msg.limit());
- mina.put(msg);
- mina.flip();
- _lastWrite = _session.write(mina);
- _log.debug("sent data:");
- }
+ if (session.isClosing())
+ {
+ throw new TransportException("attempted to write to a closed socket");
+ }
- public synchronized void flush()
- {
- if (_lastWrite != null)
+ synchronized (this)
{
- _lastWrite.join();
+ lastWrite = session.write(ByteBuffer.wrap(buf));
}
}
- public void close()
+ public void flush()
{
- // MINA will sometimes throw away in-progress writes when you ask it to close
- flush();
- CloseFuture closed = _session.close();
+ // pass
+ }
+
+ public synchronized void close()
+ {
+ // MINA will sometimes throw away in-progress writes when you
+ // ask it to close
+ synchronized (this)
+ {
+ if (lastWrite != null)
+ {
+ lastWrite.join();
+ }
+ }
+ CloseFuture closed = session.close();
closed.join();
}
public void setIdleTimeout(int i)
{
- //TODO:
- //We are instead using the setMax[Read|Write]IdleTime methods in
- //MinaNetworkConnection for this. Should remove this method from
- //sender interface, but currently being used by IoSender for 0-10.
+ //noop
}
+
+ public long getIdleTimeout()
+ {
+ return 0;
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
new file mode 100644
index 0000000000..84e66c25bd
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
@@ -0,0 +1,135 @@
+package org.apache.qpid.transport.network.nio;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+
+public class NioHandler implements Runnable
+{
+ private Receiver<ByteBuffer> _receiver;
+ private SocketChannel _ch;
+ private ByteBuffer _readBuf;
+ private static Map<Long,NioSender> _handlers = new ConcurrentHashMap<Long,NioSender>();
+
+ private NioHandler(){}
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ NioHandler handler = new NioHandler();
+ return handler.connectInternal(host,port,delegate);
+ }
+
+ private Connection connectInternal(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ try
+ {
+ SocketAddress address = new InetSocketAddress(host,port);
+ _ch = SocketChannel.open();
+ _ch.socket().setReuseAddress(true);
+ _ch.configureBlocking(true);
+ _ch.socket().setTcpNoDelay(true);
+ if (address != null)
+ {
+ _ch.socket().connect(address);
+ }
+ while (_ch.isConnectionPending())
+ {
+
+ }
+
+ }
+ catch (SocketException e)
+ {
+
+ e.printStackTrace();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+
+ NioSender sender = new NioSender(_ch);
+ Connection con = new Connection();
+ con.setSender(new Disassembler(sender, 64*1024 - 1));
+ con.setConnectionDelegate(delegate);
+
+ _handlers.put(con.getConnectionId(),sender);
+
+ _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR);
+
+ Thread t = new Thread(this);
+ t.start();
+
+ return con;
+ }
+
+ public void run()
+ {
+ _readBuf = ByteBuffer.allocate(512);
+ long read = 0;
+ while(_ch.isConnected() && _ch.isOpen())
+ {
+ try
+ {
+ read = _ch.read(_readBuf);
+ if (read > 0)
+ {
+ _readBuf.flip();
+ ByteBuffer b = ByteBuffer.allocate(_readBuf.remaining());
+ b.put(_readBuf);
+ b.flip();
+ _readBuf.clear();
+ _receiver.received(b);
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ //throw new EOFException("The underlying socket/channel has closed");
+ }
+
+ public static void startBatchingFrames(int connectionId)
+ {
+ NioSender sender = _handlers.get(connectionId);
+ sender.setStartBatching();
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
new file mode 100644
index 0000000000..2fa875f279
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
@@ -0,0 +1,126 @@
+package org.apache.qpid.transport.network.nio;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpid.transport.Sender;
+
+public class NioSender implements Sender<java.nio.ByteBuffer>
+{
+ private final Object lock = new Object();
+ private SocketChannel _ch;
+ private boolean _batch = false;
+ private ByteBuffer _batcher;
+
+ public NioSender(SocketChannel ch)
+ {
+ this._ch = ch;
+ }
+
+ public void send(java.nio.ByteBuffer buf)
+ {
+ if (_batch)
+ {
+ //System.out.println(_batcher.position() + " , " + buf.remaining() + " , " + buf.position() + ","+_batcher.capacity());
+ if (_batcher.position() + buf.remaining() >= _batcher.capacity())
+ {
+ _batcher.flip();
+ write(_batcher);
+ _batcher.clear();
+ if (buf.remaining() > _batcher.capacity())
+ {
+ write(buf);
+ }
+ else
+ {
+ _batcher.put(buf);
+ }
+ }
+ else
+ {
+ _batcher.put(buf);
+ }
+ }
+ else
+ {
+ write(buf);
+ }
+ }
+
+ public void flush()
+ {
+ // pass
+ }
+
+ private void write(java.nio.ByteBuffer buf)
+ {
+ synchronized (lock)
+ {
+ if( _ch.isConnected() && _ch.isOpen())
+ {
+ try
+ {
+ _ch.write(buf);
+ }
+ catch(Exception e)
+ {
+ e.fillInStackTrace();
+ }
+ }
+ else
+ {
+ throw new RuntimeException("Trying to write on a closed socket");
+ }
+
+ }
+ }
+
+ public void setStartBatching()
+ {
+ _batch = true;
+ _batcher = ByteBuffer.allocate(1024);
+ }
+
+ public void close()
+ {
+ // MINA will sometimes throw away in-progress writes when you
+ // ask it to close
+ synchronized (lock)
+ {
+ try
+ {
+ _ch.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ //noop
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 69e4b52edb..3f0966903d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -43,8 +43,8 @@ public class SecurityLayer
Connection con;
SSLSecurityLayer sslLayer;
SASLSecurityLayer saslLayer;
-
- public SecurityLayer(Connection con)
+
+ public void init(Connection con) throws TransportException
{
this.con = con;
this.settings = con.getConnectionSettings();
@@ -55,9 +55,10 @@ public class SecurityLayer
if (settings.isUseSASLEncryption())
{
saslLayer = new SASLSecurityLayer();
- }
+ }
+
}
-
+
public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
{
Sender<ByteBuffer> sender = delegate;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index 2d9e4e9a7e..27255f79f6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -43,7 +43,8 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
this.delegate = delegate;
log.debug("SASL Sender enabled");
}
-
+
+ @Override
public void close()
{
@@ -64,11 +65,13 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
}
}
+ @Override
public void flush()
{
delegate.flush();
}
+ @Override
public void send(ByteBuffer buf)
{
if (closed.get())
@@ -105,6 +108,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
}
}
+ @Override
public void setIdleTimeout(int i)
{
delegate.setIdleTimeout(i);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
index 0dd86d4560..14f28f8828 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
@@ -48,45 +48,51 @@ public class QpidClientX509KeyManager extends X509ExtendedKeyManager
kmf.init(ks, keyStorePassword.toCharArray());
this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
}
-
+
+ @Override
public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket)
{
log.debug("chooseClientAlias:Returning alias " + alias);
return alias;
}
+ @Override
public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket)
{
return delegate.chooseServerAlias(keyType, issuers, socket);
}
+ @Override
public X509Certificate[] getCertificateChain(String alias)
{
return delegate.getCertificateChain(alias);
}
+ @Override
public String[] getClientAliases(String keyType, Principal[] issuers)
{
log.debug("getClientAliases:Returning alias " + alias);
return new String[]{alias};
}
+ @Override
public PrivateKey getPrivateKey(String alias)
{
return delegate.getPrivateKey(alias);
}
+ @Override
public String[] getServerAliases(String keyType, Principal[] issuers)
{
return delegate.getServerAliases(keyType, issuers);
}
-
+
public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine)
{
log.debug("chooseEngineClientAlias:Returning alias " + alias);
return alias;
}
-
+
public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine)
{
return delegate.chooseEngineServerAlias(keyType, issuers, engine);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
index e261860bf3..6f21c327e7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
@@ -31,6 +31,9 @@ public class URLHelper
public static void parseOptions(Map<String, String> optionMap, String options) throws URLSyntaxException
{
+ // options looks like this
+ // brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value''
+
if ((options == null) || (options.indexOf('=') == -1))
{
return;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
index ac8e3da3c2..516204fbd3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
@@ -143,9 +143,8 @@ public class FileUtils
}
/**
- * Either opens the specified filename as an input stream or either the filesystem or classpath,
- * or uses the default resource loaded using the specified class loader, if opening the file fails
- * or no file name is specified.
+ * Either opens the specified filename as an input stream, or uses the default resource loaded using the
+ * specified class loader, if opening the file fails or no file name is specified.
*
* @param filename The name of the file to open.
* @param defaultResource The name of the default resource on the classpath if the file cannot be opened.
@@ -157,28 +156,28 @@ public class FileUtils
{
InputStream is = null;
+ // Flag to indicate whether the default resource should be used. By default this is true, so that the default
+ // is used when opening the file fails.
+ boolean useDefault = true;
+
// Try to open the file if one was specified.
if (filename != null)
{
- // try on filesystem
try
{
is = new BufferedInputStream(new FileInputStream(new File(filename)));
+
+ // Clear the default flag because the file was succesfully opened.
+ useDefault = false;
}
catch (FileNotFoundException e)
{
- is = null;
- }
-
- if (is == null)
- {
- // failed on filesystem, so try on classpath
- is = cl.getResourceAsStream(filename);
+ // Ignore this exception, the default will be used instead.
}
}
// Load the default resource if a file was not specified, or if opening the file failed.
- if (is == null)
+ if (useDefault)
{
is = cl.getResourceAsStream(defaultResource);
}
@@ -340,7 +339,7 @@ public class FileUtils
}
//else we have a source directory
- if (!dst.isDirectory() && !dst.mkdirs())
+ if (!dst.isDirectory() && !dst.mkdir())
{
throw new UnableToCopyException("Unable to create destination directory");
}