diff options
Diffstat (limited to 'java')
25 files changed, 15 insertions, 3423 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 77997a3685..78ac5747ea 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -33,7 +33,7 @@ <enabled>true</enabled> </management> <advanced> - <filterchain enableExecutorPool="true"/> + <filterchain enableExecutorPool="false"/> <enablePooledAllocator>false</enablePooledAllocator> <enableDirectBuffers>false</enableDirectBuffers> <framesize>65535</framesize> diff --git a/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java index e9e9c8aca4..6470d876bb 100644 --- a/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -19,6 +19,8 @@ package org.apache.qpid.server.transport; import org.apache.qpid.configuration.Configured; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.filter.executor.ExecutorExecutor; +import org.apache.mina.util.NewThreadExecutor; public class ConnectorConfiguration { @@ -74,20 +76,8 @@ public class ConnectorConfiguration defaultValue = "true") public boolean enableNonSSL; - @Configured(path = "advanced.useBlockingIo", - defaultValue = "false") - public boolean useBlockingIo; - public IoAcceptor createAcceptor() { - if(useBlockingIo) - { - System.out.println("Using blocking io"); - return new org.apache.qpid.bio.SocketAcceptor(); - } - else - { - return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors); - } + return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); } } diff --git a/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java b/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java index 101ba7dd36..f985050e9f 100644 --- a/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java +++ b/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java @@ -49,6 +49,11 @@ public class MockIoSession implements IoSession return null; //To change body of implemented methods use File | Settings | File Templates. } + public IoServiceConfig getServiceConfig() + { + return null; + } + public IoHandler getHandler() { return null; //To change body of implemented methods use File | Settings | File Templates. diff --git a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java index 40ea24efbf..4dd67c48d9 100644 --- a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java +++ b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java @@ -156,6 +156,11 @@ public class BasicDeliverTest return null; //To change body of implemented methods use File | Settings | File Templates. } + public IoServiceConfig getServiceConfig() + { + return null; + } + public IoHandler getHandler() { return null; //To change body of implemented methods use File | Settings | File Templates. diff --git a/java/common/lib/backport-util-concurrent/backport-util-concurrent-2.2.jar b/java/common/lib/backport-util-concurrent/backport-util-concurrent-2.2.jar Binary files differnew file mode 100644 index 0000000000..20a16877bd --- /dev/null +++ b/java/common/lib/backport-util-concurrent/backport-util-concurrent-2.2.jar diff --git a/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar b/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar Binary files differdeleted file mode 100644 index 6fcbb64543..0000000000 --- a/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar +++ /dev/null diff --git a/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar Binary files differnew file mode 100644 index 0000000000..5e55c680ff --- /dev/null +++ b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar diff --git a/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar b/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar Binary files differdeleted file mode 100644 index 45e0333be1..0000000000 --- a/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar +++ /dev/null diff --git a/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar Binary files differnew file mode 100644 index 0000000000..f3a2350806 --- /dev/null +++ b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar diff --git a/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar Binary files differnew file mode 100644 index 0000000000..89f497a056 --- /dev/null +++ b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar diff --git a/java/common/src/org/apache/qpid/bio/Reader.java b/java/common/src/org/apache/qpid/bio/Reader.java deleted file mode 100644 index 165a323337..0000000000 --- a/java/common/src/org/apache/qpid/bio/Reader.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.bio; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoHandler; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.ClosedByInterruptException; - -class Reader implements Runnable -{ - private final IoHandler handler; - private final SocketSessionImpl session; - private final ByteChannel channel; - private volatile boolean stopped; - - Reader(IoHandler handler, SocketSessionImpl session) - { - this.handler = handler; - this.session = session; - channel = session.getChannel(); - } - - void stop() - { - stopped = true; - } - - public void run() - { - while (!stopped) - { - try - { - ByteBuffer buffer = ByteBuffer.allocate(session.getReadBufferSize()); - int read = channel.read(buffer.buf()); - if(read > 0) - { - buffer.flip(); - ((SocketFilterChain) session.getFilterChain()).messageReceived(session, buffer); - } - else - { - stopped = true; - } - } - catch (ClosedByInterruptException e) - { - stopped = true; - } - catch (IOException e) - { - if (!stopped) - { - signalException(e); - session.close(); - } - } - catch (Exception e) - { - if (!stopped) - { - signalException(e); - } - } - } - } - - private void signalException(Exception e) - { - try - { - handler.exceptionCaught(session, e); - } - catch (Exception e2) - { - e.printStackTrace(); - } - } -} diff --git a/java/common/src/org/apache/qpid/bio/Sequence.java b/java/common/src/org/apache/qpid/bio/Sequence.java deleted file mode 100644 index dcaae4d6d7..0000000000 --- a/java/common/src/org/apache/qpid/bio/Sequence.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.bio; - -class Sequence -{ - private int nextId = 0; - - synchronized int nextId() - { - return nextId++; - } -} diff --git a/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java b/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java deleted file mode 100644 index 0495654f73..0000000000 --- a/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.bio; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; - -/** - * A simpler alternative to the non-blocking enabled SocketChannel for - * use with blocking io only. Not threadsafe. - */ -class SimpleSocketChannel implements ByteChannel -{ - private final Socket socket; - private final OutputStream out; - private final InputStream in; - private final byte[] buffer = new byte[2048]; - - SimpleSocketChannel(Socket socket) throws IOException - { - this.socket = socket; - out = socket.getOutputStream(); - in = socket.getInputStream(); - } - - Socket socket() - { - return socket; - } - - public int read(ByteBuffer dst) throws IOException - { - if (dst == null) - { - throw new NullPointerException("Null buffer passed into read"); - } - int read = in.read(buffer, 0, Math.min(buffer.length, dst.limit() - dst.position())); - if (read > 0) - { - dst.put(buffer, 0, read); - } - return read; - } - - public int write(ByteBuffer dst) throws IOException - { - byte[] data = new byte[dst.remaining()]; - dst.get(data); - out.write(data); - return data.length; - } - - public boolean isOpen() - { - return socket.isConnected(); - } - - public void close() throws IOException - { - socket.close(); - } -} diff --git a/java/common/src/org/apache/qpid/bio/SocketAcceptor.java b/java/common/src/org/apache/qpid/bio/SocketAcceptor.java deleted file mode 100644 index d47a29a047..0000000000 --- a/java/common/src/org/apache/qpid/bio/SocketAcceptor.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * @(#) $Id: SocketAcceptor.java 389042 2006-03-27 07:49:41Z trustin $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.bio; - -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.BaseIoAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.ByteChannel; -import java.nio.channels.ServerSocketChannel; -import java.util.*; - -/** - */ -public class SocketAcceptor extends BaseIoAcceptor -{ - private static final Sequence acceptorSeq = new Sequence(); - - private final int id = acceptorSeq.nextId(); - private final String threadName = "SocketAcceptor-" + id; - private final IoServiceConfig defaultConfig = new SocketAcceptorConfig(); - private final Map services = new HashMap();//SocketAddress => SocketBinding - - public SocketAcceptor() - { - } - - /** - * Binds to the specified <code>address</code> and handles incoming connections with the specified - * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. - * - * @throws IOException if failed to bind - */ - public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException - { - if (address == null) - { - throw new NullPointerException("address"); - } - - if (handler == null) - { - throw new NullPointerException("handler"); - } - - if (!(address instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); - } - - if (((InetSocketAddress) address).getPort() == 0) - { - throw new IllegalArgumentException("Unsupported port number: 0"); - } - - if (config == null) - { - config = getDefaultConfig(); - } - - SocketBinding service = new SocketBinding(address, handler, config); - synchronized (services) - { - services.put(address, service); - } - service.start(); - } - - public Set getManagedSessions(SocketAddress address) - { - if (address == null) - { - throw new NullPointerException("address"); - } - - SocketBinding service = (SocketBinding) services.get(address); - - if (service == null) - { - throw new IllegalArgumentException("Address not bound: " + address); - } - - return Collections.unmodifiableSet(new HashSet(service.sessions)); - } - - public void unbind(SocketAddress address) - { - if (address == null) - { - throw new NullPointerException("address"); - } - - SocketBinding service; - synchronized (services) - { - service = (SocketBinding) services.remove(address); - } - - if (service == null) - { - throw new IllegalArgumentException("Address not bound: " + address); - } - - try - { - service.unbind(); - } - catch (IOException e) - { - //TODO: handle properly - e.printStackTrace(); - } - } - - public void unbindAll() - { - synchronized (services) - { - for (Iterator i = services.entrySet().iterator(); i.hasNext();) - { - SocketBinding service = (SocketBinding) i.next(); - try - { - service.unbind(); - } - catch (IOException e) - { - //TODO: handle properly - e.printStackTrace(); - } - i.remove(); - } - } - } - - public boolean isBound(SocketAddress address) - { - synchronized (services) - { - return services.containsKey(address); - } - } - - public Set getBoundAddresses() - { - throw new UnsupportedOperationException("getBoundAddresses() not supported by blocking IO Acceptor"); - } - - public IoServiceConfig getDefaultConfig() - { - return defaultConfig; - } - - private class SocketBinding implements Runnable - { - private final SocketAddress address; - private final ServerSocketChannel service; - //private final ServerSocket service; - private final IoServiceConfig config; - private final IoHandler handler; - private final List sessions = new ArrayList(); - private volatile boolean stopped = false; - private Thread runner; - - SocketBinding(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException - { - this.address = address; - this.handler = handler; - this.config = config; - - service = ServerSocketChannel.open(); - service.socket().bind(address); - - //service = new ServerSocket(); - //service.bind(address); - } - - void unbind() throws IOException - { - stopped = true; - //shutdown all sessions - for (Iterator i = sessions.iterator(); i.hasNext();) - { - ((SocketSessionImpl) i.next()).close(); - i.remove(); - } - - //close server socket - service.close(); - if (runner != null) - { - try - { - runner.join(); - } - catch (InterruptedException e) - { - //ignore and return - System.err.println("Warning: interrupted on unbind(" + address + ")"); - } - } - } - - void start() - { - runner = new Thread(this); - runner.start(); - } - - public void run() - { - while (!stopped) - { - try - { - accept(); - } - catch (Exception e) - { - //handle this better... - e.printStackTrace(); - } - } - } - - private void accept() throws Exception - { - //accept(new SimpleSocketChannel(service.accept())); - accept(service.accept()); - } - - private void accept(ByteChannel channel) throws Exception - { - //SocketChannel channel; - //start session - SocketSessionImpl session = new SocketSessionImpl(SocketAcceptor.this, - (SocketSessionConfig) defaultConfig.getSessionConfig(), - handler, - channel, - address); - //signal start etc... - sessions.add(session); - - //TODO - //need to set up filter chains somehow... (this is copied from connector...) - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getThreadModel().buildFilterChain(session.getFilterChain()); - ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); - - session.start(); - //not sure if this will work... socket is already opened before the created callback is called... - ((SocketFilterChain) session.getFilterChain()).sessionOpened(session); - } - } -} diff --git a/java/common/src/org/apache/qpid/bio/SocketConnector.java b/java/common/src/org/apache/qpid/bio/SocketConnector.java deleted file mode 100644 index b107c44726..0000000000 --- a/java/common/src/org/apache/qpid/bio/SocketConnector.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * @(#) $Id: SocketConnector.java 389042 2006-03-27 07:49:41Z trustin $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.bio; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.BaseIoConnector; -import org.apache.mina.common.support.DefaultConnectFuture; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.nio.channels.ByteChannel; -import java.nio.channels.SocketChannel; - -/** - */ -public class SocketConnector extends BaseIoConnector -{ - /** - * @noinspection StaticNonFinalField - */ - private static final Sequence idSequence = new Sequence(); - - private final Object lock = new Object(); - private final String threadName = "SocketConnector-" + idSequence.nextId(); - private final IoServiceConfig defaultConfig = new SocketConnectorConfig(); - private final Set managedSessions = Collections.synchronizedSet(new HashSet()); - - /** - * Create a connector with a single processing thread - */ - public SocketConnector() - { - } - - public IoServiceConfig getDefaultConfig() - { - return defaultConfig; - } - - public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) - { - return connect(address, null, handler, config); - } - - public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoServiceConfig config) - { - if (address == null) - { - throw new NullPointerException("address"); - } - if (handler == null) - { - throw new NullPointerException("handler"); - } - - if (! (address instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); - } - if (localAddress != null && !(localAddress instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected local address type: " + localAddress.getClass()); - } - if (config == null) - { - config = getDefaultConfig(); - } - - DefaultConnectFuture future = new DefaultConnectFuture(); - try - { - - //Socket socket = new Socket(); - //socket.connect(address); - //SimpleSocketChannel channel = new SimpleSocketChannel(socket); - //SocketAddress serviceAddress = socket.getRemoteSocketAddress(); - - SocketChannel channel = SocketChannel.open(address); - channel.configureBlocking(true); - SocketAddress serviceAddress = channel.socket().getRemoteSocketAddress(); - - - SocketSessionImpl session = newSession(channel, handler, config, channel.socket().getRemoteSocketAddress()); - future.setSession(session); - } - catch (IOException e) - { - future.setException(e); - } - - return future; - } - - private SocketSessionImpl newSession(ByteChannel channel, IoHandler handler, IoServiceConfig config, SocketAddress serviceAddress) - throws IOException - { - SocketSessionImpl session = new SocketSessionImpl(this, - (SocketSessionConfig) config.getSessionConfig(), - handler, - channel, - serviceAddress); - try - { - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getThreadModel().buildFilterChain(session.getFilterChain()); - ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); - - session.start(); - //not sure if this will work... socket is already opened before the created callback is called... - ((SocketFilterChain) session.getFilterChain()).sessionOpened(session); - } - catch (Throwable e) - { - throw (IOException) new IOException("Failed to create a session.").initCause(e); - } - - //TODO: figure out how the managed session are used/ what they are etc. - //session.getManagedSessions().add( session ); - - - return session; - } -} diff --git a/java/common/src/org/apache/qpid/bio/SocketFilterChain.java b/java/common/src/org/apache/qpid/bio/SocketFilterChain.java deleted file mode 100644 index f00a1535aa..0000000000 --- a/java/common/src/org/apache/qpid/bio/SocketFilterChain.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * @(#) $Id: SocketFilterChain.java 398039 2006-04-28 23:36:27Z proyal $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.bio; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.support.AbstractIoFilterChain; - -import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; - -/** - */ -class SocketFilterChain extends AbstractIoFilterChain -{ - - SocketFilterChain(IoSession parent) - { - super(parent); - } - - protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception - { - SocketSessionImpl s = (SocketSessionImpl) session; - - //write to socket - try - { - s.getChannel().write(((ByteBuffer) writeRequest.getMessage()).buf()); - - //notify of completion - writeRequest.getFuture().setWritten(true); - } - catch(ClosedByInterruptException e) - { - writeRequest.getFuture().setWritten(false); - } - } - - protected void doClose(IoSession session) throws IOException - { - SocketSessionImpl s = (SocketSessionImpl) session; - s.shutdown(); - } -} diff --git a/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java b/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java deleted file mode 100644 index d534093533..0000000000 --- a/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.bio; - -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoService; -import org.apache.mina.common.IoSessionConfig; -import org.apache.mina.common.RuntimeIOException; -import org.apache.mina.common.TransportType; -import org.apache.mina.common.support.BaseIoSession; -import org.apache.mina.common.support.BaseIoSessionConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfigImpl; - -import java.io.IOException; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.channels.ByteChannel; -import java.nio.channels.SocketChannel; - -/** - */ -class SocketSessionImpl extends BaseIoSession -{ - private final IoService manager; - private final SocketSessionConfig config; - private final SocketFilterChain filterChain; - private final IoHandler handler; - private final SocketAddress remoteAddress; - private final SocketAddress localAddress; - private final SocketAddress serviceAddress; - private final Socket socket; - private final ByteChannel channel; - private final Reader reader; - private Thread runner; - private int readBufferSize; - - /** - * Creates a new instance. - */ - SocketSessionImpl(IoService manager, - SocketSessionConfig config, - IoHandler handler, - ByteChannel channel, - SocketAddress serviceAddress) throws IOException - { - this.manager = manager; - this.filterChain = new SocketFilterChain(this); - this.handler = handler; - this.channel = channel; - if(channel instanceof SocketChannel) - { - socket = ((SocketChannel) channel).socket(); - } - else if(channel instanceof SimpleSocketChannel) - { - socket = ((SimpleSocketChannel) channel).socket(); - } - else - { - throw new IllegalArgumentException("Unrecognised channel type: " + channel.getClass()); - } - - this.remoteAddress = socket.getRemoteSocketAddress(); - this.localAddress = socket.getLocalSocketAddress(); - this.serviceAddress = serviceAddress; - - this.config = new SessionConfigImpl(config); - - reader = new Reader(handler, this); - } - - void start() - { - //create & start thread for this... - runner = new Thread(reader); - runner.start(); - } - - void shutdown() throws IOException - { - filterChain.sessionClosed( this ); - reader.stop(); - channel.close(); - } - - ByteChannel getChannel() - { - return channel; - } - - protected void write0(WriteRequest writeRequest) - { - filterChain.filterWrite(this, writeRequest); - } - - protected void close0() - { - filterChain.filterClose(this); - super.close0(); - } - - protected void updateTrafficMask() - { - //TODO - } - - public IoService getService() - { - return manager; - } - - public IoSessionConfig getConfig() - { - return config; - } - - public IoFilterChain getFilterChain() - { - return filterChain; - } - - public IoHandler getHandler() - { - return handler; - } - - public int getScheduledWriteRequests() - { - return 0; - } - - public int getScheduledWriteBytes() - { - return 0; - } - - public TransportType getTransportType() - { - return TransportType.SOCKET; - } - - public SocketAddress getRemoteAddress() - { - return remoteAddress; - } - - public SocketAddress getLocalAddress() - { - return localAddress; - } - - public SocketAddress getServiceAddress() - { - return serviceAddress; - } - - int getReadBufferSize() - { - return readBufferSize; - } - - private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig - { - SessionConfigImpl() - { - } - - SessionConfigImpl(SocketSessionConfig cfg) - { - setKeepAlive(cfg.isKeepAlive()); - setOobInline(cfg.isOobInline()); - setReceiveBufferSize(cfg.getReceiveBufferSize()); - readBufferSize = cfg.getReceiveBufferSize(); - setReuseAddress(cfg.isReuseAddress()); - setSendBufferSize(cfg.getSendBufferSize()); - setSoLinger(cfg.getSoLinger()); - setTcpNoDelay(cfg.isTcpNoDelay()); - if (getTrafficClass() != cfg.getTrafficClass()) - { - setTrafficClass(cfg.getTrafficClass()); - } - } - - - public boolean isKeepAlive() - { - try - { - return socket.getKeepAlive(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public void setKeepAlive(boolean on) - { - try - { - socket.setKeepAlive(on); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public boolean isOobInline() - { - try - { - return socket.getOOBInline(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public void setOobInline(boolean on) - { - try - { - socket.setOOBInline(on); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public boolean isReuseAddress() - { - try - { - return socket.getReuseAddress(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public void setReuseAddress(boolean on) - { - try - { - socket.setReuseAddress(on); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public int getSoLinger() - { - try - { - return socket.getSoLinger(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public void setSoLinger(int linger) - { - try - { - if (linger < 0) - { - socket.setSoLinger(false, 0); - } - else - { - socket.setSoLinger(true, linger); - } - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public boolean isTcpNoDelay() - { - try - { - return socket.getTcpNoDelay(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public void setTcpNoDelay(boolean on) - { - try - { - socket.setTcpNoDelay(on); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public int getTrafficClass() - { - if (SocketSessionConfigImpl.isGetTrafficClassAvailable()) - { - try - { - return socket.getTrafficClass(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - else - { - return 0; - } - } - - public void setTrafficClass(int tc) - { - if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) - { - try - { - socket.setTrafficClass(tc); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - } - - public int getSendBufferSize() - { - try - { - return socket.getSendBufferSize(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public void setSendBufferSize(int size) - { - if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable()) - { - try - { - socket.setSendBufferSize(size); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - } - - public int getReceiveBufferSize() - { - try - { - return socket.getReceiveBufferSize(); - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - - public void setReceiveBufferSize(int size) - { - if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable()) - { - try - { - socket.setReceiveBufferSize(size); - SocketSessionImpl.this.readBufferSize = size; - } - catch (SocketException e) - { - throw new RuntimeIOException(e); - } - } - } - } -} diff --git a/java/common/src/org/apache/qpid/nio/SocketAcceptor.java b/java/common/src/org/apache/qpid/nio/SocketAcceptor.java deleted file mode 100644 index 1033d16191..0000000000 --- a/java/common/src/org/apache/qpid/nio/SocketAcceptor.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.nio; - -import org.apache.mina.common.support.DelegatedIoAcceptor; - -public class SocketAcceptor extends DelegatedIoAcceptor -{ - /** - * Creates a new instance. - */ - public SocketAcceptor() - { - init(new SocketAcceptorDelegate(this)); - } -} diff --git a/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java b/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java deleted file mode 100644 index 7339482ac0..0000000000 --- a/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java +++ /dev/null @@ -1,602 +0,0 @@ -/* - * @(#) $Id: SocketAcceptorDelegate.java 379346 2006-02-21 05:10:30Z trustin $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.nio; - -import org.apache.mina.common.*; -import org.apache.mina.common.support.BaseIoAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.IdentityHashSet; -import org.apache.mina.util.Queue; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.*; - -/** - * {@link IoAcceptor} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev: 379346 $, $Date: 2006-02-21 05:10:30 +0000 (Tue, 21 Feb 2006) $ - */ -public class SocketAcceptorDelegate extends BaseIoAcceptor -{ - private static volatile int nextId = 0; - - private final IoAcceptor wrapper; - private final int id = nextId ++; - private final String threadName = "SocketAcceptor-" + id; - private final IoServiceConfig defaultConfig = new SocketAcceptorConfig(); - private Selector selector; - private final Map channels = new HashMap(); - private final Hashtable sessions = new Hashtable(); - - private final Queue registerQueue = new Queue(); - private final Queue cancelQueue = new Queue(); - - private Worker worker; - - /** - * Creates a new instance. - */ - public SocketAcceptorDelegate(IoAcceptor wrapper) - { - this.wrapper = wrapper; - } - - /** - * Binds to the specified <code>address</code> and handles incoming - * connections with the specified <code>handler</code>. Backlog value - * is configured to the value of <code>backlog</code> property. - * - * @throws IOException if failed to bind - */ - public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException - { - if (address == null) - { - throw new NullPointerException("address"); - } - - if (handler == null) - { - throw new NullPointerException("handler"); - } - - if (!(address instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected address type: " + address.getClass()); - } - - if (((InetSocketAddress) address).getPort() == 0) - { - throw new IllegalArgumentException("Unsupported port number: 0"); - } - - if (config == null) - { - config = getDefaultConfig(); - } - - RegistrationRequest request = new RegistrationRequest(address, handler, config); - - synchronized (this) - { - synchronized (registerQueue) - { - registerQueue.push(request); - } - startupWorker(); - } - - selector.wakeup(); - - synchronized (request) - { - while (!request.done) - { - try - { - request.wait(); - } - catch (InterruptedException e) - { - } - } - } - - if (request.exception != null) - { - throw request.exception; - } - } - - - private synchronized void startupWorker() throws IOException - { - if (worker == null) - { - selector = Selector.open(); - worker = new Worker(); - - worker.start(); - } - } - - public Set getManagedSessions(SocketAddress address) - { - if (address == null) - { - throw new NullPointerException("address"); - } - - Set managedSessions = (Set) sessions.get(address); - - if (managedSessions == null) - { - throw new IllegalArgumentException("Address not bound: " + address); - } - - return Collections.unmodifiableSet( - new IdentityHashSet(Arrays.asList(managedSessions.toArray()))); - } - - public void unbind(SocketAddress address) - { - if (address == null) - { - throw new NullPointerException("address"); - } - - final Set managedSessions = (Set) sessions.get(address); - CancellationRequest request = new CancellationRequest(address); - synchronized (this) - { - try - { - startupWorker(); - } - catch (IOException e) - { - // IOException is thrown only when Worker thread is not - // running and failed to open a selector. We simply throw - // IllegalArgumentException here because we can simply - // conclude that nothing is bound to the selector. - throw new IllegalArgumentException("Address not bound: " + address); - } - - synchronized (cancelQueue) - { - cancelQueue.push(request); - } - } - - selector.wakeup(); - - synchronized (request) - { - while (!request.done) - { - try - { - request.wait(); - } - catch (InterruptedException e) - { - } - } - } - - if (request.exception != null) - { - request.exception.fillInStackTrace(); - - throw request.exception; - } - - // Disconnect all clients - IoServiceConfig cfg = request.registrationRequest.config; - boolean disconnectOnUnbind; - if (cfg instanceof IoAcceptorConfig) - { - disconnectOnUnbind = ((IoAcceptorConfig) cfg).isDisconnectOnUnbind(); - } - else - { - disconnectOnUnbind = ((IoAcceptorConfig) getDefaultConfig()).isDisconnectOnUnbind(); - } - - if (disconnectOnUnbind && managedSessions != null) - { - IoSession[] tempSessions = (IoSession[]) - managedSessions.toArray(new IoSession[ 0 ]); - - final Object lock = new Object(); - - for (int i = 0; i < tempSessions.length; i++) - { - if (!managedSessions.contains(tempSessions[i])) - { - // The session has already been closed and have been - // removed from managedSessions by the SocketIoProcessor. - continue; - } - tempSessions[i].close().setCallback(new IoFuture.Callback() - { - public void operationComplete(IoFuture future) - { - synchronized (lock) - { - lock.notify(); - } - } - }); - } - - try - { - synchronized (lock) - { - while (!managedSessions.isEmpty()) - { - lock.wait(1000); - } - } - } - catch (InterruptedException ie) - { - // Ignored - } - } - } - - public void unbindAll() - { - List addresses; - synchronized (channels) - { - addresses = new ArrayList(channels.keySet()); - } - - for (Iterator i = addresses.iterator(); i.hasNext();) - { - unbind((SocketAddress) i.next()); - } - } - - public boolean isBound(SocketAddress address) - { - synchronized (channels) - { - return channels.containsKey(address); - } - } - - public Set getBoundAddresses() - { - return wrapper.getBoundAddresses(); - } - - private class Worker extends Thread - { - public Worker() - { - super(SocketAcceptorDelegate.this.threadName); - } - - public void run() - { - for (; ;) - { - try - { - int nKeys = selector.select(); - - registerNew(); - cancelKeys(); - - if (nKeys > 0) - { - processSessions(selector.selectedKeys()); - } - - if (selector.keys().isEmpty()) - { - synchronized (SocketAcceptorDelegate.this) - { - if (selector.keys().isEmpty() && - registerQueue.isEmpty() && - cancelQueue.isEmpty()) - { - worker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - } - } - } - } - - private void processSessions(Set keys) throws IOException - { - Iterator it = keys.iterator(); - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - it.remove(); - - if (!key.isAcceptable()) - { - continue; - } - - ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); - - SocketChannel ch = ssc.accept(); - - if (ch == null) - { - continue; - } - - boolean success = false; - SocketSessionImpl session = null; - try - { - RegistrationRequest req = (RegistrationRequest) key.attachment(); - session = new SocketSessionImpl( - SocketAcceptorDelegate.this.wrapper, - (Set) sessions.get(req.address), - (SocketSessionConfig) req.config.getSessionConfig(), - ch, req.handler, - req.address); - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - req.config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); - session.getManagedSessions().add(session); - session.getIoProcessor().addNew(session); - success = true; - } - catch (Throwable t) - { - ExceptionMonitor.getInstance().exceptionCaught(t); - } - finally - { - if (!success) - { - if (session != null) - { - session.getManagedSessions().remove(session); - } - ch.close(); - } - } - } - } - } - - public IoServiceConfig getDefaultConfig() - { - return defaultConfig; - } - - private void registerNew() - { - if (registerQueue.isEmpty()) - { - return; - } - - for (; ;) - { - RegistrationRequest req; - - synchronized (registerQueue) - { - req = (RegistrationRequest) registerQueue.pop(); - } - - if (req == null) - { - break; - } - - ServerSocketChannel ssc = null; - - try - { - ssc = ServerSocketChannel.open(); - ssc.configureBlocking(false); - - // Configure the server socket, - SocketAcceptorConfig cfg; - if (req.config instanceof SocketAcceptorConfig) - { - cfg = (SocketAcceptorConfig) req.config; - } - else - { - cfg = (SocketAcceptorConfig) getDefaultConfig(); - } - - ssc.socket().setReuseAddress(cfg.isReuseAddress()); - ssc.socket().setReceiveBufferSize( - ((SocketSessionConfig) cfg.getSessionConfig()).getReceiveBufferSize()); - - // and bind. - ssc.socket().bind(req.address, cfg.getBacklog()); - ssc.register(selector, SelectionKey.OP_ACCEPT, req); - - synchronized (channels) - { - channels.put(req.address, ssc); - } - sessions.put(req.address, Collections.synchronizedSet(new HashSet())); - } - catch (IOException e) - { - req.exception = e; - } - finally - { - synchronized (req) - { - req.done = true; - - req.notify(); - } - - if (ssc != null && req.exception != null) - { - try - { - ssc.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - } - - - private void cancelKeys() - { - if (cancelQueue.isEmpty()) - { - return; - } - - for (; ;) - { - CancellationRequest request; - - synchronized (cancelQueue) - { - request = (CancellationRequest) cancelQueue.pop(); - } - - if (request == null) - { - break; - } - - sessions.remove(request.address); - ServerSocketChannel ssc; - synchronized (channels) - { - ssc = (ServerSocketChannel) channels.remove(request.address); - } - - // close the channel - try - { - if (ssc == null) - { - request.exception = new IllegalArgumentException("Address not bound: " + request.address); - } - else - { - SelectionKey key = ssc.keyFor(selector); - request.registrationRequest = (RegistrationRequest) key.attachment(); - key.cancel(); - - selector.wakeup(); // wake up again to trigger thread death - - ssc.close(); - } - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - synchronized (request) - { - request.done = true; - request.notify(); - } - } - } - } - - private static class RegistrationRequest - { - private final SocketAddress address; - private final IoHandler handler; - private final IoServiceConfig config; - private IOException exception; - private boolean done; - - private RegistrationRequest(SocketAddress address, IoHandler handler, IoServiceConfig config) - { - this.address = address; - this.handler = handler; - this.config = config; - } - } - - - private static class CancellationRequest - { - private final SocketAddress address; - private boolean done; - private RegistrationRequest registrationRequest; - private RuntimeException exception; - - private CancellationRequest(SocketAddress address) - { - this.address = address; - } - } -} diff --git a/java/common/src/org/apache/qpid/nio/SocketConnector.java b/java/common/src/org/apache/qpid/nio/SocketConnector.java deleted file mode 100644 index ce74fd0c96..0000000000 --- a/java/common/src/org/apache/qpid/nio/SocketConnector.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.nio; - -import org.apache.mina.common.support.DelegatedIoConnector; -import org.apache.mina.transport.socket.nio.support.*; - -public class SocketConnector extends DelegatedIoConnector -{ - /** - * Creates a new instance. - */ - public SocketConnector() - { - init(new SocketConnectorDelegate(this)); - } -}
\ No newline at end of file diff --git a/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java b/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java deleted file mode 100644 index 50c122d1c8..0000000000 --- a/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java +++ /dev/null @@ -1,402 +0,0 @@ -/* - * @(#) $Id: SocketConnectorDelegate.java 379044 2006-02-20 07:40:37Z trustin $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.nio; - -import org.apache.mina.common.*; -import org.apache.mina.common.support.BaseIoConnector; -import org.apache.mina.common.support.DefaultConnectFuture; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.util.Queue; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -/** - * {@link IoConnector} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev: 379044 $, $Date: 2006-02-20 07:40:37 +0000 (Mon, 20 Feb 2006) $ - */ -public class SocketConnectorDelegate extends BaseIoConnector -{ - private static volatile int nextId = 0; - - private final IoConnector wrapper; - private final int id = nextId++; - private final String threadName = "SocketConnector-" + id; - private final IoServiceConfig defaultConfig = new SocketConnectorConfig(); - private Selector selector; - private final Queue connectQueue = new Queue(); - private final Set managedSessions = Collections.synchronizedSet(new HashSet()); - private Worker worker; - - /** - * Creates a new instance. - */ - public SocketConnectorDelegate(IoConnector wrapper) - { - this.wrapper = wrapper; - } - - public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) - { - return connect(address, null, handler, config); - } - - public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoServiceConfig config) - { - if (address == null) - { - throw new NullPointerException("address"); - } - if (handler == null) - { - throw new NullPointerException("handler"); - } - - if (! (address instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected address type: " - + address.getClass()); - } - - if (localAddress != null && !(localAddress instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected local address type: " - + localAddress.getClass()); - } - - if (config == null) - { - config = getDefaultConfig(); - } - - SocketChannel ch = null; - boolean success = false; - try - { - ch = SocketChannel.open(); - ch.socket().setReuseAddress(true); - if (localAddress != null) - { - ch.socket().bind(localAddress); - } - - ch.configureBlocking(false); - - if (ch.connect(address)) - { - SocketSessionImpl session = newSession(ch, handler, config); - success = true; - ConnectFuture future = new DefaultConnectFuture(); - future.setSession(session); - return future; - } - - success = true; - } - catch (IOException e) - { - return DefaultConnectFuture.newFailedFuture(e); - } - finally - { - if (!success && ch != null) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - - ConnectionRequest request = new ConnectionRequest(ch, handler, config); - synchronized (this) - { - try - { - startupWorker(); - } - catch (IOException e) - { - try - { - ch.close(); - } - catch (IOException e2) - { - ExceptionMonitor.getInstance().exceptionCaught(e2); - } - - return DefaultConnectFuture.newFailedFuture(e); - } - synchronized (connectQueue) - { - connectQueue.push(request); - } - selector.wakeup(); - } - - return request; - } - - public IoServiceConfig getDefaultConfig() - { - return defaultConfig; - } - - private synchronized void startupWorker() throws IOException - { - if (worker == null) - { - selector = Selector.open(); - worker = new Worker(); - worker.start(); - } - } - - private void registerNew() - { - if (connectQueue.isEmpty()) - { - return; - } - - for (; ;) - { - ConnectionRequest req; - synchronized (connectQueue) - { - req = (ConnectionRequest) connectQueue.pop(); - } - - if (req == null) - { - break; - } - - SocketChannel ch = req.channel; - try - { - ch.register(selector, SelectionKey.OP_CONNECT, req); - } - catch (IOException e) - { - req.setException(e); - } - } - } - - private void processSessions(Set keys) - { - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isConnectable()) - { - continue; - } - - SocketChannel ch = (SocketChannel) key.channel(); - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - boolean success = false; - try - { - ch.finishConnect(); - SocketSessionImpl session = newSession(ch, entry.handler, entry.config); - entry.setSession(session); - success = true; - } - catch (Throwable e) - { - entry.setException(e); - } - finally - { - key.cancel(); - if (!success) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - keys.clear(); - } - - private void processTimedOutSessions(Set keys) - { - long currentTime = System.currentTimeMillis(); - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isValid()) - { - continue; - } - - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - if (currentTime >= entry.deadline) - { - entry.setException(new ConnectException()); - key.cancel(); - } - } - } - - private SocketSessionImpl newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config) throws IOException - { - SocketSessionImpl session = new SocketSessionImpl( - wrapper, managedSessions, - config.getSessionConfig(), - ch, handler, ch.socket().getRemoteSocketAddress()); - try - { - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - ((SocketFilterChain) session.getFilterChain()).sessionCreated(session); - } - catch (Throwable e) - { - throw (IOException) new IOException("Failed to create a session.").initCause(e); - } - session.getManagedSessions().add(session); - session.getIoProcessor().addNew(session); - return session; - } - - private class Worker extends Thread - { - public Worker() - { - super(SocketConnectorDelegate.this.threadName); - } - - public void run() - { - for (; ;) - { - try - { - int nKeys = selector.select(1000); - - registerNew(); - - if (nKeys > 0) - { - processSessions(selector.selectedKeys()); - } - - processTimedOutSessions(selector.keys()); - - if (selector.keys().isEmpty()) - { - synchronized (SocketConnectorDelegate.this) - { - if (selector.keys().isEmpty() && - connectQueue.isEmpty()) - { - worker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - } - } - } - } - } - - private class ConnectionRequest extends DefaultConnectFuture - { - private final SocketChannel channel; - private final long deadline; - private final IoHandler handler; - private final IoServiceConfig config; - - private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) - { - this.channel = channel; - long timeout; - if (config instanceof IoConnectorConfig) - { - timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); - } - else - { - timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); - } - this.deadline = System.currentTimeMillis() + timeout; - this.handler = handler; - this.config = config; - } - } -}
\ No newline at end of file diff --git a/java/common/src/org/apache/qpid/nio/SocketFilterChain.java b/java/common/src/org/apache/qpid/nio/SocketFilterChain.java deleted file mode 100644 index 07143512b5..0000000000 --- a/java/common/src/org/apache/qpid/nio/SocketFilterChain.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.qpid.nio; - -import java.io.IOException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.support.AbstractIoFilterChain; -import org.apache.mina.util.Queue; - -/** - * An {@link IoFilterChain} for socket transport (TCP/IP). - * - * @author The Apache Directory Project - */ -class SocketFilterChain extends AbstractIoFilterChain { - - public SocketFilterChain( IoSession parent ) - { - super( parent ); - } - - protected void doWrite( IoSession session, WriteRequest writeRequest ) - { - SocketSessionImpl s = ( SocketSessionImpl ) session; - Queue writeRequestQueue = s.getWriteRequestQueue(); - - // SocketIoProcessor.doFlush() will reset it after write is finished - // because the buffer will be passed with messageSent event. - ( ( ByteBuffer ) writeRequest.getMessage() ).mark(); - synchronized( writeRequestQueue ) - { - writeRequestQueue.push( writeRequest ); - if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) - { - // Notify SocketIoProcessor only when writeRequestQueue was empty. - s.getIoProcessor().flush( s ); - } - } - } - - protected void doClose( IoSession session ) throws IOException - { - SocketSessionImpl s = ( SocketSessionImpl ) session; - s.getIoProcessor().remove( s ); - } -} diff --git a/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java b/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java deleted file mode 100644 index d20ab41b96..0000000000 --- a/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java +++ /dev/null @@ -1,770 +0,0 @@ -/* - * @(#) $Id: SocketIoProcessor.java 372449 2006-01-26 05:24:58Z trustin $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.nio; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.WriteTimeoutException; -import org.apache.mina.util.Queue; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; - -/** - * Performs all I/O operations for sockets which is connected or bound. - * This class is used by MINA internally. - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev: 372449 $, $Date: 2006-01-26 05:24:58 +0000 (Thu, 26 Jan 2006) $, - */ -class SocketIoProcessor -{ - private static final Logger _logger = Logger.getLogger(SocketIoProcessor.class); - - private static final String PROCESSORS_PROPERTY = "mina.socket.processors"; - private static final String THREAD_PREFIX = "SocketIoProcessor-"; - private static final int DEFAULT_PROCESSORS = 1; - private static final int PROCESSOR_COUNT; - private static final SocketIoProcessor[] PROCESSORS; - - private static int nextId; - - static - { - PROCESSOR_COUNT = configureProcessorCount(); - PROCESSORS = createProcessors(); - } - - /** - * Returns the {@link SocketIoProcessor} to be used for a newly - * created session - * - * @return The processor to be employed - */ - static synchronized SocketIoProcessor getInstance() - { - SocketIoProcessor processor = PROCESSORS[nextId ++]; - nextId %= PROCESSOR_COUNT; - return processor; - } - - private final String threadName; - private Selector selector; - - private final Queue newSessions = new Queue(); - private final Queue removingSessions = new Queue(); - private final Queue flushingSessions = new Queue(); - private final Queue trafficControllingSessions = new Queue(); - - private Worker worker; - private long lastIdleCheckTime = System.currentTimeMillis(); - - private SocketIoProcessor(String threadName) - { - this.threadName = threadName; - } - - void addNew(SocketSessionImpl session) throws IOException - { - synchronized (this) - { - synchronized (newSessions) - { - newSessions.push(session); - } - startupWorker(); - } - - selector.wakeup(); - } - - void remove(SocketSessionImpl session) throws IOException - { - scheduleRemove(session); - startupWorker(); - selector.wakeup(); - } - - private synchronized void startupWorker() throws IOException - { - if (worker == null) - { - selector = Selector.open(); - worker = new Worker(); - worker.start(); - } - } - - void flush(SocketSessionImpl session) - { - scheduleFlush(session); - Selector selector = this.selector; - if (selector != null) - { - selector.wakeup(); - } - } - - void updateTrafficMask(SocketSessionImpl session) - { - scheduleTrafficControl(session); - Selector selector = this.selector; - if (selector != null) - { - selector.wakeup(); - } - } - - private void scheduleRemove(SocketSessionImpl session) - { - synchronized (removingSessions) - { - removingSessions.push(session); - } - } - - private void scheduleFlush(SocketSessionImpl session) - { - synchronized (flushingSessions) - { - flushingSessions.push(session); - } - } - - private void scheduleTrafficControl(SocketSessionImpl session) - { - synchronized (trafficControllingSessions) - { - trafficControllingSessions.push(session); - } - } - - private void doAddNew() - { - if (newSessions.isEmpty()) - { - return; - } - - SocketSessionImpl session; - - for (; ;) - { - synchronized (newSessions) - { - session = (SocketSessionImpl) newSessions.pop(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - boolean registered; - - try - { - ch.configureBlocking(false); - session.setSelectionKey(ch.register(selector, - SelectionKey.OP_READ, - session)); - registered = true; - } - catch (IOException e) - { - session.getManagedSessions().remove(session); - registered = false; - ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); - } - - if (registered) - { - ((SocketFilterChain) session.getFilterChain()).sessionOpened(session); - } - } - } - - private void doRemove() - { - if (removingSessions.isEmpty()) - { - return; - } - - for (; ;) - { - SocketSessionImpl session; - - synchronized (removingSessions) - { - session = (SocketSessionImpl) removingSessions.pop(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - SelectionKey key = session.getSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.close() is called before addSession() is processed) - if (key == null) - { - scheduleRemove(session); - break; - } - // skip if channel is already closed - if (!key.isValid()) - { - continue; - } - - try - { - key.cancel(); - ch.close(); - } - catch (IOException e) - { - ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); - } - finally - { - releaseWriteBuffers(session); - session.getManagedSessions().remove(session); - - ((SocketFilterChain) session.getFilterChain()).sessionClosed(session); - session.getCloseFuture().setClosed(); - } - } - } - - private void process(Set selectedKeys) - { - Iterator it = selectedKeys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - - if (key.isReadable() && session.getTrafficMask().isReadable()) - { - read(session); - } - - if (key.isWritable() && session.getTrafficMask().isWritable()) - { - scheduleFlush(session); - } - } - - selectedKeys.clear(); - } - - private void read(SocketSessionImpl session) - { - ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); - SocketChannel ch = session.getChannel(); - - try - { - int readBytes = 0; - int ret; - - buf.clear(); - - try - { - while ((ret = ch.read(buf.buf())) > 0) - { - readBytes += ret; - } - } - finally - { - buf.flip(); - } - - session.increaseReadBytes(readBytes); - - if (readBytes > 0) - { - /*ByteBuffer newBuf = ByteBuffer.allocate(readBytes); - newBuf.put(buf); - newBuf.flip();*/ - //((SocketFilterChain) session.getFilterChain()).messageReceived(session, newBuf); - ((SocketFilterChain) session.getFilterChain()).messageReceived(session, buf); - } - if (ret < 0) - { - scheduleRemove(session); - } - } - catch (Throwable e) - { - if (e instanceof IOException) - { - scheduleRemove(session); - } - buf.release(); - ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); - } - /*finally - { - buf.release(); - } */ - } - - private void notifyIdleness() - { - // process idle sessions - long currentTime = System.currentTimeMillis(); - if ((currentTime - lastIdleCheckTime) >= 1000) - { - lastIdleCheckTime = currentTime; - Set keys = selector.keys(); - if (keys != null) - { - for (Iterator it = keys.iterator(); it.hasNext();) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - notifyIdleness(session, currentTime); - } - } - } - } - - private void notifyIdleness(SocketSessionImpl session, long currentTime) - { - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), - IdleStatus.BOTH_IDLE, - Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.READER_IDLE), - IdleStatus.READER_IDLE, - Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), - IdleStatus.WRITER_IDLE, - Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - - notifyWriteTimeout(session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime()); - } - - private void notifyIdleness0(SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime) - { - if (idleTime > 0 && lastIoTime != 0 - && (currentTime - lastIoTime) >= idleTime) - { - session.increaseIdleCount(status); - ((SocketFilterChain) session.getFilterChain()).sessionIdle(session, status); - } - } - - private void notifyWriteTimeout(SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime) - { - SelectionKey key = session.getSelectionKey(); - if (writeTimeout > 0 - && (currentTime - lastIoTime) >= writeTimeout - && key != null && key.isValid() - && (key.interestOps() & SelectionKey.OP_WRITE) != 0) - { - ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, new WriteTimeoutException()); - } - } - - private void doFlush() - { - if (flushingSessions.size() == 0) - { - return; - } - - for (; ;) - { - SocketSessionImpl session; - - synchronized (flushingSessions) - { - session = (SocketSessionImpl) flushingSessions.pop(); - } - - if (session == null) - { - break; - } - - if (!session.isConnected()) - { - releaseWriteBuffers(session); - continue; - } - - SelectionKey key = session.getSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.write() is called before addSession() is processed) - if (key == null) - { - scheduleFlush(session); - break; - } - // skip if channel is already closed - if (!key.isValid()) - { - continue; - } - - try - { - doFlush(session); - } - catch (IOException e) - { - scheduleRemove(session); - ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); - } - } - } - - private void releaseWriteBuffers(SocketSessionImpl session) - { - Queue writeRequestQueue = session.getWriteRequestQueue(); - WriteRequest req; - - while ((req = (WriteRequest) writeRequestQueue.pop()) != null) - { - try - { - ((ByteBuffer) req.getMessage()).release(); - } - catch (IllegalStateException e) - { - ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e); - } - finally - { - req.getFuture().setWritten(false); - } - } - } - - private void doFlush(SocketSessionImpl session) throws IOException - { - // Clear OP_WRITE - SelectionKey key = session.getSelectionKey(); - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - - SocketChannel ch = session.getChannel(); - Queue writeRequestQueue = session.getWriteRequestQueue(); - - WriteRequest req; - for (; ;) - { - synchronized (writeRequestQueue) - { - req = (WriteRequest) writeRequestQueue.first(); - } - - if (req == null) - { - break; - } - - ByteBuffer buf = (ByteBuffer) req.getMessage(); - if (buf.remaining() == 0) - { - synchronized (writeRequestQueue) - { - writeRequestQueue.pop(); - } - - session.increaseWrittenWriteRequests(); - buf.reset(); - ((SocketFilterChain) session.getFilterChain()).messageSent(session, req); - continue; - } - - int writtenBytes = ch.write(buf.buf()); - if (writtenBytes > 0) - { - session.increaseWrittenBytes(writtenBytes); - } - - if (buf.hasRemaining()) - { - //_logger.info("Kernel buf full"); - // Kernel buffer is full - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - selector.wakeup(); - break; - } - } - } - - private void doUpdateTrafficMask() - { - if (trafficControllingSessions.isEmpty()) - { - return; - } - - for (; ;) - { - SocketSessionImpl session; - - synchronized (trafficControllingSessions) - { - session = (SocketSessionImpl) trafficControllingSessions.pop(); - } - - if (session == null) - { - break; - } - - SelectionKey key = session.getSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.suspend??() or session.resume??() is - // called before addSession() is processed) - if (key == null) - { - scheduleTrafficControl(session); - break; - } - // skip if channel is already closed - if (!key.isValid()) - { - continue; - } - - // The normal is OP_READ and, if there are write requests in the - // session's write queue, set OP_WRITE to trigger flushing. - int ops = SelectionKey.OP_READ; - Queue writeRequestQueue = session.getWriteRequestQueue(); - synchronized (writeRequestQueue) - { - if (!writeRequestQueue.isEmpty()) - { - ops |= SelectionKey.OP_WRITE; - } - } - - // Now mask the preferred ops with the mask of the current session - int mask = session.getTrafficMask().getInterestOps(); - key.interestOps(ops & mask); - } - } - - /** - * Configures the number of processors employed. - * We first check for a system property "mina.IoProcessors". If this - * property is present and can be interpreted as an integer value greater - * or equal to 1, this value is used as the number of processors. - * Otherwise a default of 1 processor is employed. - * - * @return The nubmer of processors to employ - */ - private static int configureProcessorCount() - { - int processors = DEFAULT_PROCESSORS; - String processorProperty = System.getProperty(PROCESSORS_PROPERTY); - if (processorProperty != null) - { - try - { - processors = Integer.parseInt(processorProperty); - } - catch (NumberFormatException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - processors = Math.max(processors, 1); - - System.setProperty(PROCESSORS_PROPERTY, String.valueOf(processors)); - } - - return processors; - } - - private static SocketIoProcessor[] createProcessors() - { - SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ]; - for (int i = 0; i < PROCESSOR_COUNT; i ++) - { - processors[i] = new SocketIoProcessor(THREAD_PREFIX + i); - } - return processors; - } - - private class WorkerFlusher implements Runnable - { - private volatile boolean _shutdown = false; - - private volatile boolean _sleep = false; - - private final Object _lock = new Object(); - - public void run() - { - while (!_shutdown) - { - doFlush(); - try - { - sleep(); - } - catch (InterruptedException e) - { - // IGNORE - } - } - _logger.info("Flusher shutting down"); - } - - private void sleep() throws InterruptedException - { - synchronized (_lock) - { - while (_sleep && !_shutdown) - { - _logger.debug("Flusher going to sleep"); - _lock.wait(); - } - _sleep = true; - } - } - - void wakeup() - { - synchronized (_lock) - { - if (_sleep) - { - _logger.debug("Waking up flusher"); - _sleep = false; - _lock.notify(); - } - } - } - - void shutdown() - { - _shutdown = true; - wakeup(); - } - } - - private class Worker extends Thread - { - private WorkerFlusher _flusher; - - public Worker() - { - super(SocketIoProcessor.this.threadName); - _flusher = new WorkerFlusher(); - new Thread(_flusher, SocketIoProcessor.this.threadName + "Flusher").start(); - } - - public void run() - { - for (; ;) - { - try - { - int nKeys = selector.select(1000); - doAddNew(); - doUpdateTrafficMask(); - - if (nKeys > 0) - { - process(selector.selectedKeys()); - } - - //doFlush(); - // in case the flusher has gone to sleep we wake it up - if (flushingSessions.size() > 0) - { - _flusher.wakeup(); - } - doRemove(); - notifyIdleness(); - - if (selector.keys().isEmpty()) - { - synchronized (SocketIoProcessor.this) - { - if (selector.keys().isEmpty() && - newSessions.isEmpty()) - { - worker = null; - _flusher.shutdown(); - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch (Throwable t) - { - ExceptionMonitor.getInstance().exceptionCaught(t); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - } - } - } - } - } - -} diff --git a/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java b/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java deleted file mode 100644 index f7c74f7a14..0000000000 --- a/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * @(#) $Id: SocketSessionImpl.java 385247 2006-03-12 05:06:11Z trustin $ - * - * Copyright 2004 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.nio; - -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.*; -import org.apache.mina.common.support.BaseIoSession; -import org.apache.mina.common.support.BaseIoSessionConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.Queue; - -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.Set; - -/** - * An {@link IoSession} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (dev@directory.apache.org) - * @version $Rev: 385247 $, $Date: 2006-03-12 05:06:11 +0000 (Sun, 12 Mar 2006) $ - */ -class SocketSessionImpl extends BaseIoSession -{ - private final IoService manager; - private final SocketSessionConfig config = new SocketSessionConfigImpl(); - private final SocketIoProcessor ioProcessor; - private final SocketFilterChain filterChain; - private final SocketChannel ch; - private final Queue writeRequestQueue; - private final IoHandler handler; - private final SocketAddress remoteAddress; - private final SocketAddress localAddress; - private final SocketAddress serviceAddress; - private final Set managedSessions; - private SelectionKey key; - private int readBufferSize; - - /** - * Creates a new instance. - */ - public SocketSessionImpl( - IoService manager, Set managedSessions, - IoSessionConfig config, - SocketChannel ch, IoHandler defaultHandler, - SocketAddress serviceAddress ) - { - this.manager = manager; - this.managedSessions = managedSessions; - this.ioProcessor = SocketIoProcessor.getInstance(); - this.filterChain = new SocketFilterChain( this ); - this.ch = ch; - this.writeRequestQueue = new Queue(); - this.handler = defaultHandler; - this.remoteAddress = ch.socket().getRemoteSocketAddress(); - this.localAddress = ch.socket().getLocalSocketAddress(); - this.serviceAddress = serviceAddress; - - // Apply the initial session settings - if( config instanceof SocketSessionConfig ) - { - SocketSessionConfig cfg = ( SocketSessionConfig ) config; - this.config.setKeepAlive( cfg.isKeepAlive() ); - this.config.setOobInline( cfg.isOobInline() ); - this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() ); - this.readBufferSize = cfg.getReceiveBufferSize(); - this.config.setReuseAddress( cfg.isReuseAddress() ); - this.config.setSendBufferSize( cfg.getSendBufferSize() ); - this.config.setSoLinger( cfg.getSoLinger() ); - this.config.setTcpNoDelay( cfg.isTcpNoDelay() ); - - if( this.config.getTrafficClass() != cfg.getTrafficClass() ) - { - this.config.setTrafficClass( cfg.getTrafficClass() ); - } - } - } - - public IoService getService() - { - return manager; - } - - public IoSessionConfig getConfig() - { - return config; - } - - SocketIoProcessor getIoProcessor() - { - return ioProcessor; - } - - public IoFilterChain getFilterChain() - { - return filterChain; - } - - SocketChannel getChannel() - { - return ch; - } - - Set getManagedSessions() - { - return managedSessions; - } - - SelectionKey getSelectionKey() - { - return key; - } - - void setSelectionKey( SelectionKey key ) - { - this.key = key; - } - - public IoHandler getHandler() - { - return handler; - } - - protected void close0() - { - filterChain.filterClose( this ); - } - - Queue getWriteRequestQueue() - { - return writeRequestQueue; - } - - public int getScheduledWriteRequests() - { - synchronized( writeRequestQueue ) - { - return writeRequestQueue.size(); - } - } - - public int getScheduledWriteBytes() - { - synchronized( writeRequestQueue ) - { - return writeRequestQueue.byteSize(); - } - } - - protected void write0( WriteRequest writeRequest ) - { - filterChain.filterWrite( this, writeRequest ); - } - - public TransportType getTransportType() - { - return TransportType.SOCKET; - } - - public SocketAddress getRemoteAddress() - { - return remoteAddress; - } - - public SocketAddress getLocalAddress() - { - return localAddress; - } - - public SocketAddress getServiceAddress() - { - return serviceAddress; - } - - protected void updateTrafficMask() - { - this.ioProcessor.updateTrafficMask( this ); - } - - int getReadBufferSize() - { - return readBufferSize; - } - - private class SocketSessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig - { - public boolean isKeepAlive() - { - try - { - return ch.socket().getKeepAlive(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setKeepAlive( boolean on ) - { - try - { - ch.socket().setKeepAlive( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public boolean isOobInline() - { - try - { - return ch.socket().getOOBInline(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setOobInline( boolean on ) - { - try - { - ch.socket().setOOBInline( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public boolean isReuseAddress() - { - try - { - return ch.socket().getReuseAddress(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setReuseAddress( boolean on ) - { - try - { - ch.socket().setReuseAddress( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public int getSoLinger() - { - try - { - return ch.socket().getSoLinger(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setSoLinger( int linger ) - { - try - { - if( linger < 0 ) - { - ch.socket().setSoLinger( false, 0 ); - } - else - { - ch.socket().setSoLinger( true, linger ); - } - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public boolean isTcpNoDelay() - { - try - { - return ch.socket().getTcpNoDelay(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setTcpNoDelay( boolean on ) - { - try - { - ch.socket().setTcpNoDelay( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public int getTrafficClass() - { - try - { - return ch.socket().getTrafficClass(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setTrafficClass( int tc ) - { - try - { - ch.socket().setTrafficClass( tc ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public int getSendBufferSize() - { - try - { - return ch.socket().getSendBufferSize(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setSendBufferSize( int size ) - { - try - { - ch.socket().setSendBufferSize( size ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public int getReceiveBufferSize() - { - try - { - return ch.socket().getReceiveBufferSize(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setReceiveBufferSize( int size ) - { - try - { - ch.socket().setReceiveBufferSize( size ); - SocketSessionImpl.this.readBufferSize = size; - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - } -} diff --git a/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java index 5de2bf8d0e..bed9ac0b99 100644 --- a/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java +++ b/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -18,7 +18,7 @@ package org.apache.qpid.pool; import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.ReferenceCountingIoFilter; +import org.apache.mina.filter.ReferenceCountingIoFilter; import org.apache.mina.common.ThreadModel; public class ReadWriteThreadModel implements ThreadModel |