summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-07 15:16:33 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-07 15:16:33 +0000
commita8555fea06aad38999620ce4de09a5e5c6b63fd5 (patch)
tree104e32c0efbf22e3b3fe3427ab9c6ab2d646f886 /java/common
parent155ed9d858367d218e43144061b9c1225dd30e2b (diff)
downloadqpid-python-a8555fea06aad38999620ce4de09a5e5c6b63fd5.tar.gz
QPID-3343: refactor test profiles to allow testing within the same JVM for all protocols, remove vm:// transport support and associated forked Mina classes
Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143874 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java467
-rw-r--r--java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java227
-rw-r--r--java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java351
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java197
-rw-r--r--java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java440
-rw-r--r--java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java151
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Transport.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java33
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/URLHelper.java3
-rw-r--r--java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java6
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java3
12 files changed, 8 insertions, 1922 deletions
diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
deleted file mode 100644
index 0c311b6645..0000000000
--- a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
+++ /dev/null
@@ -1,467 +0,0 @@
-package org.apache.mina.common;
-
-import org.apache.mina.common.ByteBuffer;
-
-import java.nio.*;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class FixedSizeByteBufferAllocator implements ByteBufferAllocator
-{
-
-
- private static final int MINIMUM_CAPACITY = 1;
-
- public FixedSizeByteBufferAllocator ()
- {
- }
-
- public ByteBuffer allocate( int capacity, boolean direct )
- {
- java.nio.ByteBuffer nioBuffer;
- if( direct )
- {
- nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity );
- }
- else
- {
- nioBuffer = java.nio.ByteBuffer.allocate( capacity );
- }
- return new FixedSizeByteBuffer( nioBuffer );
- }
-
- public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer )
- {
- return new FixedSizeByteBuffer( nioBuffer );
- }
-
- public void dispose()
- {
- }
-
-
-
- private static final class FixedSizeByteBuffer extends ByteBuffer
- {
- private java.nio.ByteBuffer buf;
- private int mark = -1;
-
-
- protected FixedSizeByteBuffer( java.nio.ByteBuffer buf )
- {
- this.buf = buf;
- buf.order( ByteOrder.BIG_ENDIAN );
- }
-
- public synchronized void acquire()
- {
- }
-
- public void release()
- {
- }
-
- public java.nio.ByteBuffer buf()
- {
- return buf;
- }
-
- public boolean isPooled()
- {
- return false;
- }
-
- public void setPooled( boolean pooled )
- {
- }
-
- public ByteBuffer duplicate() {
- return new FixedSizeByteBuffer( this.buf.duplicate() );
- }
-
- public ByteBuffer slice() {
- return new FixedSizeByteBuffer( this.buf.slice() );
- }
-
- public ByteBuffer asReadOnlyBuffer() {
- return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() );
- }
-
- public byte[] array()
- {
- return buf.array();
- }
-
- public int arrayOffset()
- {
- return buf.arrayOffset();
- }
-
- public boolean isDirect()
- {
- return buf.isDirect();
- }
-
- public boolean isReadOnly()
- {
- return buf.isReadOnly();
- }
-
- public int capacity()
- {
- return buf.capacity();
- }
-
- public ByteBuffer capacity( int newCapacity )
- {
- if( newCapacity > capacity() )
- {
- throw new IllegalArgumentException();
- }
-
- return this;
- }
-
-
-
- public boolean isAutoExpand()
- {
- return false;
- }
-
- public ByteBuffer setAutoExpand( boolean autoExpand )
- {
- if(autoExpand) throw new IllegalArgumentException();
- else return this;
- }
-
- public ByteBuffer expand( int pos, int expectedRemaining )
- {
- int end = pos + expectedRemaining;
- if( end > capacity() )
- {
- // The buffer needs expansion.
- capacity( end );
- }
-
- if( end > limit() )
- {
- // We call limit() directly to prevent StackOverflowError
- buf.limit( end );
- }
- return this;
- }
-
- public int position()
- {
- return buf.position();
- }
-
- public ByteBuffer position( int newPosition )
- {
-
- buf.position( newPosition );
- if( mark > newPosition )
- {
- mark = -1;
- }
- return this;
- }
-
- public int limit()
- {
- return buf.limit();
- }
-
- public ByteBuffer limit( int newLimit )
- {
- buf.limit( newLimit );
- if( mark > newLimit )
- {
- mark = -1;
- }
- return this;
- }
-
- public ByteBuffer mark()
- {
- buf.mark();
- mark = position();
- return this;
- }
-
- public int markValue()
- {
- return mark;
- }
-
- public ByteBuffer reset()
- {
- buf.reset();
- return this;
- }
-
- public ByteBuffer clear()
- {
- buf.clear();
- mark = -1;
- return this;
- }
-
- public ByteBuffer flip()
- {
- buf.flip();
- mark = -1;
- return this;
- }
-
- public ByteBuffer rewind()
- {
- buf.rewind();
- mark = -1;
- return this;
- }
-
- public byte get()
- {
- return buf.get();
- }
-
- public ByteBuffer put( byte b )
- {
- buf.put( b );
- return this;
- }
-
- public byte get( int index )
- {
- return buf.get( index );
- }
-
- public ByteBuffer put( int index, byte b )
- {
- buf.put( index, b );
- return this;
- }
-
- public ByteBuffer get( byte[] dst, int offset, int length )
- {
- buf.get( dst, offset, length );
- return this;
- }
-
- public ByteBuffer put( java.nio.ByteBuffer src )
- {
- buf.put( src );
- return this;
- }
-
- public ByteBuffer put( byte[] src, int offset, int length )
- {
- buf.put( src, offset, length );
- return this;
- }
-
- public ByteBuffer compact()
- {
- buf.compact();
- mark = -1;
- return this;
- }
-
- public ByteOrder order()
- {
- return buf.order();
- }
-
- public ByteBuffer order( ByteOrder bo )
- {
- buf.order( bo );
- return this;
- }
-
- public char getChar()
- {
- return buf.getChar();
- }
-
- public ByteBuffer putChar( char value )
- {
- buf.putChar( value );
- return this;
- }
-
- public char getChar( int index )
- {
- return buf.getChar( index );
- }
-
- public ByteBuffer putChar( int index, char value )
- {
- buf.putChar( index, value );
- return this;
- }
-
- public CharBuffer asCharBuffer()
- {
- return buf.asCharBuffer();
- }
-
- public short getShort()
- {
- return buf.getShort();
- }
-
- public ByteBuffer putShort( short value )
- {
- buf.putShort( value );
- return this;
- }
-
- public short getShort( int index )
- {
- return buf.getShort( index );
- }
-
- public ByteBuffer putShort( int index, short value )
- {
- buf.putShort( index, value );
- return this;
- }
-
- public ShortBuffer asShortBuffer()
- {
- return buf.asShortBuffer();
- }
-
- public int getInt()
- {
- return buf.getInt();
- }
-
- public ByteBuffer putInt( int value )
- {
- buf.putInt( value );
- return this;
- }
-
- public int getInt( int index )
- {
- return buf.getInt( index );
- }
-
- public ByteBuffer putInt( int index, int value )
- {
- buf.putInt( index, value );
- return this;
- }
-
- public IntBuffer asIntBuffer()
- {
- return buf.asIntBuffer();
- }
-
- public long getLong()
- {
- return buf.getLong();
- }
-
- public ByteBuffer putLong( long value )
- {
- buf.putLong( value );
- return this;
- }
-
- public long getLong( int index )
- {
- return buf.getLong( index );
- }
-
- public ByteBuffer putLong( int index, long value )
- {
- buf.putLong( index, value );
- return this;
- }
-
- public LongBuffer asLongBuffer()
- {
- return buf.asLongBuffer();
- }
-
- public float getFloat()
- {
- return buf.getFloat();
- }
-
- public ByteBuffer putFloat( float value )
- {
- buf.putFloat( value );
- return this;
- }
-
- public float getFloat( int index )
- {
- return buf.getFloat( index );
- }
-
- public ByteBuffer putFloat( int index, float value )
- {
- buf.putFloat( index, value );
- return this;
- }
-
- public FloatBuffer asFloatBuffer()
- {
- return buf.asFloatBuffer();
- }
-
- public double getDouble()
- {
- return buf.getDouble();
- }
-
- public ByteBuffer putDouble( double value )
- {
- buf.putDouble( value );
- return this;
- }
-
- public double getDouble( int index )
- {
- return buf.getDouble( index );
- }
-
- public ByteBuffer putDouble( int index, double value )
- {
- buf.putDouble( index, value );
- return this;
- }
-
- public DoubleBuffer asDoubleBuffer()
- {
- return buf.asDoubleBuffer();
- }
-
-
- }
-
-
-}
diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
deleted file mode 100644
index 4fd28c4eb5..0000000000
--- a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.common.support;
-
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFutureListener;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-/**
- * A default implementation of {@link org.apache.mina.common.IoFuture}.
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- */
-public class DefaultIoFuture implements IoFuture
-{
- private final IoSession session;
- private final Object lock;
- private List listeners;
- private Object result;
- private boolean ready;
-
-
- /**
- * Creates a new instance.
- *
- * @param session an {@link IoSession} which is associated with this future
- */
- public DefaultIoFuture( IoSession session )
- {
- this.session = session;
- this.lock = this;
- }
-
- /**
- * Creates a new instance which uses the specified object as a lock.
- */
- public DefaultIoFuture( IoSession session, Object lock )
- {
- if( lock == null )
- {
- throw new NullPointerException( "lock" );
- }
- this.session = session;
- this.lock = lock;
- }
-
- public IoSession getSession()
- {
- return session;
- }
-
- public Object getLock()
- {
- return lock;
- }
-
- public void join()
- {
- synchronized( lock )
- {
- while( !ready )
- {
- try
- {
- lock.wait();
- }
- catch( InterruptedException e )
- {
- }
- }
- }
- }
-
- public boolean join( long timeoutInMillis )
- {
- long startTime = ( timeoutInMillis <= 0 ) ? 0 : System
- .currentTimeMillis();
- long waitTime = timeoutInMillis;
-
- synchronized( lock )
- {
- if( ready )
- {
- return ready;
- }
- else if( waitTime <= 0 )
- {
- return ready;
- }
-
- for( ;; )
- {
- try
- {
- lock.wait( waitTime );
- }
- catch( InterruptedException e )
- {
- }
-
- if( ready )
- return true;
- else
- {
- waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime );
- if( waitTime <= 0 )
- {
- return ready;
- }
- }
- }
- }
- }
-
- public boolean isReady()
- {
- synchronized( lock )
- {
- return ready;
- }
- }
-
- /**
- * Sets the result of the asynchronous operation, and mark it as finished.
- */
- protected void setValue( Object newValue )
- {
- synchronized( lock )
- {
- // Allow only once.
- if( ready )
- {
- return;
- }
-
- result = newValue;
- ready = true;
- lock.notifyAll();
-
- notifyListeners();
- }
- }
-
- /**
- * Returns the result of the asynchronous operation.
- */
- protected Object getValue()
- {
- synchronized( lock )
- {
- return result;
- }
- }
-
- public void addListener( IoFutureListener listener )
- {
- if( listener == null )
- {
- throw new NullPointerException( "listener" );
- }
-
- synchronized( lock )
- {
- if(listeners == null)
- {
- listeners = new ArrayList();
- }
- listeners.add( listener );
- if( ready )
- {
- listener.operationComplete( this );
- }
- }
- }
-
- public void removeListener( IoFutureListener listener )
- {
- if( listener == null )
- {
- throw new NullPointerException( "listener" );
- }
-
- synchronized( lock )
- {
- listeners.remove( listener );
- }
- }
-
- private void notifyListeners()
- {
- synchronized( lock )
- {
-
- if(listeners != null)
- {
-
- for( Iterator i = listeners.iterator(); i.hasNext(); ) {
- ( ( IoFutureListener ) i.next() ).operationComplete( this );
- }
- }
- }
- }
-}
-
-
-
diff --git a/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java b/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
deleted file mode 100644
index 5723ffbaa9..0000000000
--- a/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.common.support;
-
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.mina.common.IoAcceptorConfig;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFuture;
-import org.apache.mina.common.IoFutureListener;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoServiceListener;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.util.IdentityHashSet;
-
-/**
- * A helper which provides addition and removal of {@link IoServiceListener}s and firing
- * events.
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 446526 $, $Date: 2006-09-15 01:44:11 -0400 (Fri, 15 Sep 2006) $
- */
-public class IoServiceListenerSupport
-{
- /**
- * A list of {@link IoServiceListener}s.
- */
- private final List listeners = new ArrayList();
-
- /**
- * Tracks managed <tt>serviceAddress</tt>es.
- */
- private final Set managedServiceAddresses = new HashSet();
-
- /**
- * Tracks managed sesssions with <tt>serviceAddress</tt> as a key.
- */
- private final Map managedSessions = new HashMap();
-
- /**
- * Creates a new instance.
- */
- public IoServiceListenerSupport()
- {
- }
-
- /**
- * Adds a new listener.
- */
- public void add( IoServiceListener listener )
- {
- synchronized( listeners )
- {
- listeners.add( listener );
- }
- }
-
- /**
- * Removes an existing listener.
- */
- public void remove( IoServiceListener listener )
- {
- synchronized( listeners )
- {
- listeners.remove( listener );
- }
- }
-
- public Set getManagedServiceAddresses()
- {
- return Collections.unmodifiableSet( managedServiceAddresses );
- }
-
- public boolean isManaged( SocketAddress serviceAddress )
- {
- synchronized( managedServiceAddresses )
- {
- return managedServiceAddresses.contains( serviceAddress );
- }
- }
-
- public Set getManagedSessions( SocketAddress serviceAddress )
- {
- Set sessions;
- synchronized( managedSessions )
- {
- sessions = ( Set ) managedSessions.get( serviceAddress );
- if( sessions == null )
- {
- sessions = new IdentityHashSet();
- }
- }
-
- synchronized( sessions )
- {
- return new IdentityHashSet( sessions );
- }
- }
-
- /**
- * Calls {@link IoServiceListener#serviceActivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
- * for all registered listeners.
- */
- public void fireServiceActivated(
- IoService service, SocketAddress serviceAddress,
- IoHandler handler, IoServiceConfig config )
- {
- synchronized( managedServiceAddresses )
- {
- if( !managedServiceAddresses.add( serviceAddress ) )
- {
- return;
- }
- }
-
- synchronized( listeners )
- {
- for( Iterator i = listeners.iterator(); i.hasNext(); )
- {
- ( ( IoServiceListener ) i.next() ).serviceActivated(
- service, serviceAddress, handler, config );
- }
- }
- }
-
- /**
- * Calls {@link IoServiceListener#serviceDeactivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
- * for all registered listeners.
- */
- public synchronized void fireServiceDeactivated(
- IoService service, SocketAddress serviceAddress,
- IoHandler handler, IoServiceConfig config )
- {
- synchronized( managedServiceAddresses )
- {
- if( !managedServiceAddresses.remove( serviceAddress ) )
- {
- return;
- }
- }
-
- try
- {
- synchronized( listeners )
- {
- for( Iterator i = listeners.iterator(); i.hasNext(); )
- {
- ( ( IoServiceListener ) i.next() ).serviceDeactivated(
- service, serviceAddress, handler, config );
- }
- }
- }
- finally
- {
- disconnectSessions( serviceAddress, config );
- }
- }
-
-
- /**
- * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
- */
- public void fireSessionCreated( IoSession session )
- {
- SocketAddress serviceAddress = session.getServiceAddress();
-
- // Get the session set.
- boolean firstSession = false;
- Set sessions;
- synchronized( managedSessions )
- {
- sessions = ( Set ) managedSessions.get( serviceAddress );
- if( sessions == null )
- {
- sessions = new IdentityHashSet();
- managedSessions.put( serviceAddress, sessions );
- firstSession = true;
- }
- }
-
- // If already registered, ignore.
- synchronized( sessions )
- {
- if ( !sessions.add( session ) )
- {
- return;
- }
- }
-
- // If the first connector session, fire a virtual service activation event.
- if( session.getService() instanceof IoConnector && firstSession )
- {
- fireServiceActivated(
- session.getService(), session.getServiceAddress(),
- session.getHandler(), session.getServiceConfig() );
- }
-
- // Fire session events.
- session.getFilterChain().fireSessionCreated( session );
- session.getFilterChain().fireSessionOpened( session);
-
- // Fire listener events.
- synchronized( listeners )
- {
- for( Iterator i = listeners.iterator(); i.hasNext(); )
- {
- ( ( IoServiceListener ) i.next() ).sessionCreated( session );
- }
- }
- }
-
- /**
- * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
- */
- public void fireSessionDestroyed( IoSession session )
- {
- SocketAddress serviceAddress = session.getServiceAddress();
-
- // Get the session set.
- Set sessions;
- boolean lastSession = false;
- synchronized( managedSessions )
- {
- sessions = ( Set ) managedSessions.get( serviceAddress );
- // Ignore if unknown.
- if( sessions == null )
- {
- return;
- }
-
- // Try to remove the remaining empty seession set after removal.
- synchronized( sessions )
- {
- sessions.remove( session );
- if( sessions.isEmpty() )
- {
- managedSessions.remove( serviceAddress );
- lastSession = true;
- }
- }
- }
-
- // Fire session events.
- session.getFilterChain().fireSessionClosed( session );
-
- // Fire listener events.
- try
- {
- synchronized( listeners )
- {
- for( Iterator i = listeners.iterator(); i.hasNext(); )
- {
- ( ( IoServiceListener ) i.next() ).sessionDestroyed( session );
- }
- }
- }
- finally
- {
- // Fire a virtual service deactivation event for the last session of the connector.
- //TODO double-check that this is *STILL* the last session. May not be the case
- if( session.getService() instanceof IoConnector && lastSession )
- {
- fireServiceDeactivated(
- session.getService(), session.getServiceAddress(),
- session.getHandler(), session.getServiceConfig() );
- }
- }
- }
-
- private void disconnectSessions( SocketAddress serviceAddress, IoServiceConfig config )
- {
- if( !( config instanceof IoAcceptorConfig ) )
- {
- return;
- }
-
- if( !( ( IoAcceptorConfig ) config ).isDisconnectOnUnbind() )
- {
- return;
- }
-
- Set sessions;
- synchronized( managedSessions )
- {
- sessions = ( Set ) managedSessions.get( serviceAddress );
- }
-
- if( sessions == null )
- {
- return;
- }
-
- Set sessionsCopy;
-
- // Create a copy to avoid ConcurrentModificationException
- synchronized( sessions )
- {
- sessionsCopy = new IdentityHashSet( sessions );
- }
-
- final CountDownLatch latch = new CountDownLatch(sessionsCopy.size());
-
- for( Iterator i = sessionsCopy.iterator(); i.hasNext(); )
- {
- ( ( IoSession ) i.next() ).close().addListener( new IoFutureListener()
- {
- public void operationComplete( IoFuture future )
- {
- latch.countDown();
- }
- } );
- }
-
- try
- {
- latch.await();
- }
- catch( InterruptedException ie )
- {
- // Ignored
- }
- }
-}
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java b/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
deleted file mode 100644
index 3f7e206cb4..0000000000
--- a/java/common/src/main/java/org/apache/mina/filter/codec/OurCumulativeProtocolDecoder.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.filter.codec;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-
-/**
- * A {@link ProtocolDecoder} that cumulates the content of received
- * buffers to a <em>cumulative buffer</em> to help users implement decoders.
- * <p>
- * If the received {@link ByteBuffer} is only a part of a message.
- * decoders should cumulate received buffers to make a message complete or
- * to postpone decoding until more buffers arrive.
- * <p>
- * Here is an example decoder that decodes CRLF terminated lines into
- * <code>Command</code> objects:
- * <pre>
- * public class CRLFTerminatedCommandLineDecoder
- * extends CumulativeProtocolDecoder {
- *
- * private Command parseCommand(ByteBuffer in) {
- * // Convert the bytes in the specified buffer to a
- * // Command object.
- * ...
- * }
- *
- * protected boolean doDecode(IoSession session, ByteBuffer in,
- * ProtocolDecoderOutput out)
- * throws Exception {
- *
- * // Remember the initial position.
- * int start = in.position();
- *
- * // Now find the first CRLF in the buffer.
- * byte previous = 0;
- * while (in.hasRemaining()) {
- * byte current = in.get();
- *
- * if (previous == '\r' && current == '\n') {
- * // Remember the current position and limit.
- * int position = in.position();
- * int limit = in.limit();
- * try {
- * in.position(start);
- * in.limit(position);
- * // The bytes between in.position() and in.limit()
- * // now contain a full CRLF terminated line.
- * out.write(parseCommand(in.slice()));
- * } finally {
- * // Set the position to point right after the
- * // detected line and set the limit to the old
- * // one.
- * in.position(position);
- * in.limit(limit);
- * }
- * // Decoded one line; CumulativeProtocolDecoder will
- * // call me again until I return false. So just
- * // return true until there are no more lines in the
- * // buffer.
- * return true;
- * }
- *
- * previous = current;
- * }
- *
- * // Could not find CRLF in the buffer. Reset the initial
- * // position to the one we recorded above.
- * in.position(start);
- *
- * return false;
- * }
- * }
- * </pre>
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-public abstract class OurCumulativeProtocolDecoder extends ProtocolDecoderAdapter {
-
- private static final String BUFFER = OurCumulativeProtocolDecoder.class
- .getName()
- + ".Buffer";
-
- /**
- * Creates a new instance.
- */
- protected OurCumulativeProtocolDecoder() {
- }
-
- /**
- * Cumulates content of <tt>in</tt> into internal buffer and forwards
- * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
- * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
- * and the cumulative buffer is NOT compacted after decoding ends.
- *
- * @throws IllegalStateException if your <tt>doDecode()</tt> returned
- * <tt>true</tt> not consuming the cumulative buffer.
- */
- public void decode(IoSession session, ByteBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- boolean usingSessionBuffer = true;
- ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
- // If we have a session buffer, append data to that; otherwise
- // use the buffer read from the network directly.
- if (buf != null) {
- buf.put(in);
- buf.flip();
- } else {
- buf = in;
- usingSessionBuffer = false;
- }
-
- for (;;) {
- int oldPos = buf.position();
- boolean decoded = doDecode(session, buf, out);
- if (decoded) {
- if (buf.position() == oldPos) {
- throw new IllegalStateException(
- "doDecode() can't return true when buffer is not consumed.");
- }
-
- if (!buf.hasRemaining()) {
- break;
- }
- } else {
- break;
- }
- }
-
-
- // if there is any data left that cannot be decoded, we store
- // it in a buffer in the session and next time this decoder is
- // invoked the session buffer gets appended to
- if (buf.hasRemaining()) {
- storeRemainingInSession(buf, session);
- } else {
- if (usingSessionBuffer)
- removeSessionBuffer(session);
- }
- }
-
- /**
- * Implement this method to consume the specified cumulative buffer and
- * decode its content into message(s).
- *
- * @param in the cumulative buffer
- * @return <tt>true</tt> if and only if there's more to decode in the buffer
- * and you want to have <tt>doDecode</tt> method invoked again.
- * Return <tt>false</tt> if remaining data is not enough to decode,
- * then this method will be invoked again when more data is cumulated.
- * @throws Exception if cannot decode <tt>in</tt>.
- */
- protected abstract boolean doDecode(IoSession session, ByteBuffer in,
- ProtocolDecoderOutput out) throws Exception;
-
- /**
- * Releases the cumulative buffer used by the specified <tt>session</tt>.
- * Please don't forget to call <tt>super.dispose( session )</tt> when
- * you override this method.
- */
- public void dispose(IoSession session) throws Exception {
- removeSessionBuffer(session);
- }
-
- private void removeSessionBuffer(IoSession session) {
- ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
- if (buf != null) {
- buf.release();
- }
- }
-
- private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
- ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
- remainingBuf.setAutoExpand(true);
- remainingBuf.order(buf.order());
- remainingBuf.put(buf);
- session.setAttribute(BUFFER, remainingBuf);
- }
-}
diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
deleted file mode 100644
index b8c6f29720..0000000000
--- a/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java
+++ /dev/null
@@ -1,440 +0,0 @@
-package org.apache.mina.filter.codec;
-
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.DefaultWriteFuture;
-import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
-import org.apache.mina.util.SessionLog;
-import org.apache.mina.util.Queue;
-
-
-public class QpidProtocolCodecFilter extends IoFilterAdapter
-{
- public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
- public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
-
- private static final Class[] EMPTY_PARAMS = new Class[0];
- private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
-
- private final ProtocolCodecFactory factory;
-
- public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
- {
- if( factory == null )
- {
- throw new NullPointerException( "factory" );
- }
- this.factory = factory;
- }
-
- public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
- {
- if( encoder == null )
- {
- throw new NullPointerException( "encoder" );
- }
- if( decoder == null )
- {
- throw new NullPointerException( "decoder" );
- }
-
- this.factory = new ProtocolCodecFactory()
- {
- public ProtocolEncoder getEncoder()
- {
- return encoder;
- }
-
- public ProtocolDecoder getDecoder()
- {
- return decoder;
- }
- };
- }
-
- public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
- {
- if( encoderClass == null )
- {
- throw new NullPointerException( "encoderClass" );
- }
- if( decoderClass == null )
- {
- throw new NullPointerException( "decoderClass" );
- }
- if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
- {
- throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
- }
- if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
- {
- throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
- }
- try
- {
- encoderClass.getConstructor( EMPTY_PARAMS );
- }
- catch( NoSuchMethodException e )
- {
- throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
- }
- try
- {
- decoderClass.getConstructor( EMPTY_PARAMS );
- }
- catch( NoSuchMethodException e )
- {
- throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
- }
-
- this.factory = new ProtocolCodecFactory()
- {
- public ProtocolEncoder getEncoder() throws Exception
- {
- return ( ProtocolEncoder ) encoderClass.newInstance();
- }
-
- public ProtocolDecoder getDecoder() throws Exception
- {
- return ( ProtocolDecoder ) decoderClass.newInstance();
- }
- };
- }
-
- public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
- {
- if( parent.contains( ProtocolCodecFilter.class ) )
- {
- throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
- }
- }
-
- public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
- {
- if( !( message instanceof ByteBuffer ) )
- {
- nextFilter.messageReceived( session, message );
- return;
- }
-
- ByteBuffer in = ( ByteBuffer ) message;
- ProtocolDecoder decoder = getDecoder( session );
- ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
-
- try
- {
- decoder.decode( session, in, decoderOut );
- }
- catch( Throwable t )
- {
- ProtocolDecoderException pde;
- if( t instanceof ProtocolDecoderException )
- {
- pde = ( ProtocolDecoderException ) t;
- }
- else
- {
- pde = new ProtocolDecoderException( t );
- }
- pde.setHexdump( in.getHexDump() );
- throw pde;
- }
- finally
- {
- // Dispose the decoder if this session is connectionless.
- if( session.getTransportType().isConnectionless() )
- {
- disposeDecoder( session );
- }
-
- // Release the read buffer.
- in.release();
-
- decoderOut.flush();
- }
- }
-
- public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
- {
- if( message instanceof HiddenByteBuffer )
- {
- return;
- }
-
- if( !( message instanceof MessageByteBuffer ) )
- {
- nextFilter.messageSent( session, message );
- return;
- }
-
- nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
- }
-
- public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
- {
- Object message = writeRequest.getMessage();
- if( message instanceof ByteBuffer )
- {
- nextFilter.filterWrite( session, writeRequest );
- return;
- }
-
- ProtocolEncoder encoder = getEncoder( session );
- ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
-
- try
- {
- encoder.encode( session, message, encoderOut );
- encoderOut.flush();
- nextFilter.filterWrite(
- session,
- new IoFilter.WriteRequest(
- new MessageByteBuffer( writeRequest.getMessage() ),
- writeRequest.getFuture(), writeRequest.getDestination() ) );
- }
- catch( Throwable t )
- {
- ProtocolEncoderException pee;
- if( t instanceof ProtocolEncoderException )
- {
- pee = ( ProtocolEncoderException ) t;
- }
- else
- {
- pee = new ProtocolEncoderException( t );
- }
- throw pee;
- }
- finally
- {
- // Dispose the encoder if this session is connectionless.
- if( session.getTransportType().isConnectionless() )
- {
- disposeEncoder( session );
- }
- }
- }
-
- public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
- {
- // Call finishDecode() first when a connection is closed.
- ProtocolDecoder decoder = getDecoder( session );
- ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
- try
- {
- decoder.finishDecode( session, decoderOut );
- }
- catch( Throwable t )
- {
- ProtocolDecoderException pde;
- if( t instanceof ProtocolDecoderException )
- {
- pde = ( ProtocolDecoderException ) t;
- }
- else
- {
- pde = new ProtocolDecoderException( t );
- }
- throw pde;
- }
- finally
- {
- // Dispose all.
- disposeEncoder( session );
- disposeDecoder( session );
-
- decoderOut.flush();
- }
-
- nextFilter.sessionClosed( session );
- }
-
- private ProtocolEncoder getEncoder( IoSession session ) throws Exception
- {
- ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
- if( encoder == null )
- {
- encoder = factory.getEncoder();
- session.setAttribute( ENCODER, encoder );
- }
- return encoder;
- }
-
- private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
- {
- return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
- }
-
- private ProtocolDecoder getDecoder( IoSession session ) throws Exception
- {
- ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
- if( decoder == null )
- {
- decoder = factory.getDecoder();
- session.setAttribute( DECODER, decoder );
- }
- return decoder;
- }
-
- private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
- {
- return new SimpleProtocolDecoderOutput( session, nextFilter );
- }
-
- private void disposeEncoder( IoSession session )
- {
- ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
- if( encoder == null )
- {
- return;
- }
-
- try
- {
- encoder.dispose( session );
- }
- catch( Throwable t )
- {
- SessionLog.warn(
- session,
- "Failed to dispose: " + encoder.getClass().getName() +
- " (" + encoder + ')' );
- }
- }
-
- private void disposeDecoder( IoSession session )
- {
- ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
- if( decoder == null )
- {
- return;
- }
-
- try
- {
- decoder.dispose( session );
- }
- catch( Throwable t )
- {
- SessionLog.warn(
- session,
- "Falied to dispose: " + decoder.getClass().getName() +
- " (" + decoder + ')' );
- }
- }
-
- private static class HiddenByteBuffer extends ByteBufferProxy
- {
- private HiddenByteBuffer( ByteBuffer buf )
- {
- super( buf );
- }
- }
-
- private static class MessageByteBuffer extends ByteBufferProxy
- {
- private final Object message;
-
- private MessageByteBuffer( Object message )
- {
- super( EMPTY_BUFFER );
- this.message = message;
- }
-
- public void acquire()
- {
- // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
- }
-
- public void release()
- {
- // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
- }
- }
-
- private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
- {
- private ByteBuffer buffer;
-
- private final IoSession session;
- private final IoFilter.NextFilter nextFilter;
- private final IoFilter.WriteRequest writeRequest;
-
- public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
- {
- this.session = session;
- this.nextFilter = nextFilter;
- this.writeRequest = writeRequest;
- }
-
-
-
- public void write( ByteBuffer buf )
- {
- if(buffer != null)
- {
- flush();
- }
- buffer = buf;
- }
-
- public void mergeAll()
- {
- }
-
- public WriteFuture flush()
- {
- WriteFuture future = null;
- if( buffer == null )
- {
- return null;
- }
- else
- {
- ByteBuffer buf = buffer;
- // Flush only when the buffer has remaining.
- if( buf.hasRemaining() )
- {
- future = doFlush( buf );
- }
-
- }
-
- return future;
- }
-
-
- protected WriteFuture doFlush( ByteBuffer buf )
- {
- WriteFuture future = new DefaultWriteFuture( session );
- nextFilter.filterWrite(
- session,
- new IoFilter.WriteRequest(
- buf,
- future, writeRequest.getDestination() ) );
- return future;
- }
- }
-}
-
diff --git a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java b/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
deleted file mode 100644
index a23e546af5..0000000000
--- a/java/common/src/main/java/org/apache/mina/transport/vmpipe/QpidVmPipeConnector.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.vmpipe;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.common.support.BaseIoConnectorConfig;
-import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.common.support.DefaultConnectFuture;
-import org.apache.mina.transport.vmpipe.support.VmPipe;
-import org.apache.mina.transport.vmpipe.support.VmPipeIdleStatusChecker;
-import org.apache.mina.transport.vmpipe.support.VmPipeSessionImpl;
-import org.apache.mina.util.AnonymousSocketAddress;
-
-/**
- * Connects to {@link IoHandler}s which is bound on the specified
- * {@link VmPipeAddress}.
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-public class QpidVmPipeConnector extends VmPipeConnector
-{
- private static final IoSessionConfig CONFIG = new BaseIoSessionConfig() {};
- private final IoServiceConfig defaultConfig = new BaseIoConnectorConfig()
- {
- public IoSessionConfig getSessionConfig()
- {
- return CONFIG;
- }
- };
-
- /**
- * Creates a new instance.
- */
- public QpidVmPipeConnector()
- {
- }
-
- public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
- {
- return connect( address, null, handler, config );
- }
-
- public ConnectFuture connect( SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config )
- {
- if( address == null )
- throw new NullPointerException( "address" );
- if( handler == null )
- throw new NullPointerException( "handler" );
- if( ! ( address instanceof VmPipeAddress ) )
- throw new IllegalArgumentException(
- "address must be VmPipeAddress." );
-
- if( config == null )
- {
- config = getDefaultConfig();
- }
-
- VmPipe entry = ( VmPipe ) VmPipeAcceptor.boundHandlers.get( address );
- if( entry == null )
- {
- return DefaultConnectFuture.newFailedFuture(
- new IOException( "Endpoint unavailable: " + address ) );
- }
-
- DefaultConnectFuture future = new DefaultConnectFuture();
- VmPipeSessionImpl localSession =
- new VmPipeSessionImpl(
- this,
- config,
- getListeners(),
- new Object(), // lock
- new AnonymousSocketAddress(),
- handler,
- entry );
-
- // initialize acceptor session
- VmPipeSessionImpl remoteSession = localSession.getRemoteSession();
- try
- {
- IoFilterChain filterChain = remoteSession.getFilterChain();
- entry.getAcceptor().getFilterChainBuilder().buildFilterChain( filterChain );
- entry.getConfig().getFilterChainBuilder().buildFilterChain( filterChain );
- entry.getConfig().getThreadModel().buildFilterChain( filterChain );
-
- // The following sentences don't throw any exceptions.
- entry.getListeners().fireSessionCreated( remoteSession );
- VmPipeIdleStatusChecker.getInstance().addSession( remoteSession );
- }
- catch( Throwable t )
- {
- ExceptionMonitor.getInstance().exceptionCaught( t );
- remoteSession.close();
- }
-
-
- // initialize connector session
- try
- {
- IoFilterChain filterChain = localSession.getFilterChain();
- this.getFilterChainBuilder().buildFilterChain( filterChain );
- config.getFilterChainBuilder().buildFilterChain( filterChain );
- config.getThreadModel().buildFilterChain( filterChain );
-
- // The following sentences don't throw any exceptions.
- localSession.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future );
- getListeners().fireSessionCreated( localSession );
- VmPipeIdleStatusChecker.getInstance().addSession( localSession);
- }
- catch( Throwable t )
- {
- future.setException( t );
- }
-
-
-
- return future;
- }
-
- public IoServiceConfig getDefaultConfig()
- {
- return defaultConfig;
- }
-} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index 4b8a0baf75..742d6575df 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -23,5 +23,4 @@ package org.apache.qpid.transport.network;
public class Transport
{
public static final String TCP = "tcp";
- public static final String VM = "vm";
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java b/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java
deleted file mode 100644
index acc55c2e2d..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
-public class VMBrokerMap
-{
- private static final Map<Integer, VmPipeAddress> _map = new HashMap<Integer, VmPipeAddress>();
-
- public static void add(int port, VmPipeAddress pipe)
- {
- _map.put(port, pipe);
- }
-
- public static VmPipeAddress remove(int port)
- {
- return _map.remove(port);
- }
-
- public static void clear()
- {
- _map.clear();
- }
-
- public static boolean contains(int port)
- {
- return _map.containsKey(port);
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
index 62f9429f30..d0367b82f4 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -36,8 +36,6 @@ import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.NewThreadExecutor;
-import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -51,7 +49,6 @@ import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
-import org.apache.qpid.transport.network.VMBrokerMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +56,6 @@ public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingN
{
private static final int UNKNOWN = -1;
private static final int TCP = 0;
- private static final int VM = 1;
public NetworkConnection _connection;
private SocketAcceptor _acceptor;
@@ -83,16 +79,6 @@ public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingN
});
_connection = stc.connect(delegate, settings, sslFactory);
break;
- case VM:
- stc = new IoConnectorCreator(new SocketConnectorFactory()
- {
- public IoConnector newConnector()
- {
- return new QpidVmPipeConnector();
- }
- });
- _connection = stc.connect(delegate, settings, sslFactory);
- break;
case UNKNOWN:
default:
throw new TransportException("Unknown protocol: " + settings.getProtocol());
@@ -108,12 +94,7 @@ public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingN
return TCP;
}
- if (transport.equals(Transport.VM))
- {
- return VM;
- }
-
- return -1;
+ return UNKNOWN;
}
public void close()
@@ -198,18 +179,6 @@ public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingN
{
address = new InetSocketAddress(settings.getHost(), port);
}
- else if(Transport.VM.equalsIgnoreCase(protocol))
- {
- synchronized (VMBrokerMap.class)
- {
- if(!VMBrokerMap.contains(port))
- {
- throw new TransportException("VM broker on port " + port + " does not exist.");
- }
- }
-
- address = new VmPipeAddress(port);
- }
else
{
throw new TransportException("Unknown transport: " + protocol);
diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
index 6f21c327e7..e261860bf3 100644
--- a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
+++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
@@ -31,9 +31,6 @@ public class URLHelper
public static void parseOptions(Map<String, String> optionMap, String options) throws URLSyntaxException
{
- // options looks like this
- // brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value''
-
if ((options == null) || (options.indexOf('=') == -1))
{
return;
diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
index 8b470d555e..808374b06e 100644
--- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -31,6 +31,7 @@ import junit.framework.TestCase;
import junit.framework.TestResult;
import org.apache.log4j.Logger;
+import org.apache.mina.util.AvailablePortFinder;
public class QpidTestCase extends TestCase
{
@@ -127,4 +128,9 @@ public class QpidTestCase extends TestCase
return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ;
}
+
+ public int findFreePort()
+ {
+ return AvailablePortFinder.getNextAvailable(10000);
+ }
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
index a4292d9009..1a1b5af805 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.mina.util.AvailablePortFinder;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
@@ -61,7 +60,7 @@ public class MinaNetworkHandlerTest extends QpidTestCase
public void setUp() throws Exception
{
String host = InetAddress.getLocalHost().getHostName();
- _testPort = AvailablePortFinder.getNextAvailable(10000);
+ _testPort = findFreePort();
_clientSettings = new ConnectionSettings();
_clientSettings.setHost(host);