summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/etc/config.xml2
-rw-r--r--java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java16
-rw-r--r--java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java5
-rw-r--r--java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java5
-rw-r--r--java/common/lib/backport-util-concurrent/backport-util-concurrent-2.2.jarbin0 -> 326319 bytes
-rw-r--r--java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jarbin348936 -> 0 bytes
-rw-r--r--java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jarbin0 -> 299769 bytes
-rw-r--r--java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jarbin18413 -> 0 bytes
-rw-r--r--java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jarbin0 -> 17998 bytes
-rw-r--r--java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jarbin0 -> 2692 bytes
-rw-r--r--java/common/src/org/apache/qpid/bio/Reader.java98
-rw-r--r--java/common/src/org/apache/qpid/bio/Sequence.java29
-rw-r--r--java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java82
-rw-r--r--java/common/src/org/apache/qpid/bio/SocketAcceptor.java277
-rw-r--r--java/common/src/org/apache/qpid/bio/SocketConnector.java150
-rw-r--r--java/common/src/org/apache/qpid/bio/SocketFilterChain.java62
-rw-r--r--java/common/src/org/apache/qpid/bio/SocketSessionImpl.java421
-rw-r--r--java/common/src/org/apache/qpid/nio/SocketAcceptor.java31
-rw-r--r--java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java602
-rw-r--r--java/common/src/org/apache/qpid/nio/SocketConnector.java32
-rw-r--r--java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java402
-rw-r--r--java/common/src/org/apache/qpid/nio/SocketFilterChain.java48
-rw-r--r--java/common/src/org/apache/qpid/nio/SocketIoProcessor.java770
-rw-r--r--java/common/src/org/apache/qpid/nio/SocketSessionImpl.java404
-rw-r--r--java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java2
25 files changed, 15 insertions, 3423 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 77997a3685..78ac5747ea 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -33,7 +33,7 @@
<enabled>true</enabled>
</management>
<advanced>
- <filterchain enableExecutorPool="true"/>
+ <filterchain enableExecutorPool="false"/>
<enablePooledAllocator>false</enablePooledAllocator>
<enableDirectBuffers>false</enableDirectBuffers>
<framesize>65535</framesize>
diff --git a/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java
index e9e9c8aca4..6470d876bb 100644
--- a/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java
+++ b/java/broker/src/org/apache/qpid/server/transport/ConnectorConfiguration.java
@@ -19,6 +19,8 @@ package org.apache.qpid.server.transport;
import org.apache.qpid.configuration.Configured;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.filter.executor.ExecutorExecutor;
+import org.apache.mina.util.NewThreadExecutor;
public class ConnectorConfiguration
{
@@ -74,20 +76,8 @@ public class ConnectorConfiguration
defaultValue = "true")
public boolean enableNonSSL;
- @Configured(path = "advanced.useBlockingIo",
- defaultValue = "false")
- public boolean useBlockingIo;
-
public IoAcceptor createAcceptor()
{
- if(useBlockingIo)
- {
- System.out.println("Using blocking io");
- return new org.apache.qpid.bio.SocketAcceptor();
- }
- else
- {
- return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors);
- }
+ return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
}
}
diff --git a/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java b/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
index 101ba7dd36..f985050e9f 100644
--- a/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
+++ b/java/broker/test/src/org/apache/qpid/server/protocol/MockIoSession.java
@@ -49,6 +49,11 @@ public class MockIoSession implements IoSession
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public IoServiceConfig getServiceConfig()
+ {
+ return null;
+ }
+
public IoHandler getHandler()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
diff --git a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
index 40ea24efbf..4dd67c48d9 100644
--- a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
+++ b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
@@ -156,6 +156,11 @@ public class BasicDeliverTest
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public IoServiceConfig getServiceConfig()
+ {
+ return null;
+ }
+
public IoHandler getHandler()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
diff --git a/java/common/lib/backport-util-concurrent/backport-util-concurrent-2.2.jar b/java/common/lib/backport-util-concurrent/backport-util-concurrent-2.2.jar
new file mode 100644
index 0000000000..20a16877bd
--- /dev/null
+++ b/java/common/lib/backport-util-concurrent/backport-util-concurrent-2.2.jar
Binary files differ
diff --git a/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar b/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar
deleted file mode 100644
index 6fcbb64543..0000000000
--- a/java/common/lib/mina/mina-core-0.9.5-SNAPSHOT.jar
+++ /dev/null
Binary files differ
diff --git a/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..5e55c680ff
--- /dev/null
+++ b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar
Binary files differ
diff --git a/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar b/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar
deleted file mode 100644
index 45e0333be1..0000000000
--- a/java/common/lib/mina/mina-filter-ssl-0.9.5-SNAPSHOT.jar
+++ /dev/null
Binary files differ
diff --git a/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..f3a2350806
--- /dev/null
+++ b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar
Binary files differ
diff --git a/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..89f497a056
--- /dev/null
+++ b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar
Binary files differ
diff --git a/java/common/src/org/apache/qpid/bio/Reader.java b/java/common/src/org/apache/qpid/bio/Reader.java
deleted file mode 100644
index 165a323337..0000000000
--- a/java/common/src/org/apache/qpid/bio/Reader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.bio;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoHandler;
-
-import java.io.IOException;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.ClosedByInterruptException;
-
-class Reader implements Runnable
-{
- private final IoHandler handler;
- private final SocketSessionImpl session;
- private final ByteChannel channel;
- private volatile boolean stopped;
-
- Reader(IoHandler handler, SocketSessionImpl session)
- {
- this.handler = handler;
- this.session = session;
- channel = session.getChannel();
- }
-
- void stop()
- {
- stopped = true;
- }
-
- public void run()
- {
- while (!stopped)
- {
- try
- {
- ByteBuffer buffer = ByteBuffer.allocate(session.getReadBufferSize());
- int read = channel.read(buffer.buf());
- if(read > 0)
- {
- buffer.flip();
- ((SocketFilterChain) session.getFilterChain()).messageReceived(session, buffer);
- }
- else
- {
- stopped = true;
- }
- }
- catch (ClosedByInterruptException e)
- {
- stopped = true;
- }
- catch (IOException e)
- {
- if (!stopped)
- {
- signalException(e);
- session.close();
- }
- }
- catch (Exception e)
- {
- if (!stopped)
- {
- signalException(e);
- }
- }
- }
- }
-
- private void signalException(Exception e)
- {
- try
- {
- handler.exceptionCaught(session, e);
- }
- catch (Exception e2)
- {
- e.printStackTrace();
- }
- }
-}
diff --git a/java/common/src/org/apache/qpid/bio/Sequence.java b/java/common/src/org/apache/qpid/bio/Sequence.java
deleted file mode 100644
index dcaae4d6d7..0000000000
--- a/java/common/src/org/apache/qpid/bio/Sequence.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.bio;
-
-class Sequence
-{
- private int nextId = 0;
-
- synchronized int nextId()
- {
- return nextId++;
- }
-}
diff --git a/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java b/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java
deleted file mode 100644
index 0495654f73..0000000000
--- a/java/common/src/org/apache/qpid/bio/SimpleSocketChannel.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.bio;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-
-/**
- * A simpler alternative to the non-blocking enabled SocketChannel for
- * use with blocking io only. Not threadsafe.
- */
-class SimpleSocketChannel implements ByteChannel
-{
- private final Socket socket;
- private final OutputStream out;
- private final InputStream in;
- private final byte[] buffer = new byte[2048];
-
- SimpleSocketChannel(Socket socket) throws IOException
- {
- this.socket = socket;
- out = socket.getOutputStream();
- in = socket.getInputStream();
- }
-
- Socket socket()
- {
- return socket;
- }
-
- public int read(ByteBuffer dst) throws IOException
- {
- if (dst == null)
- {
- throw new NullPointerException("Null buffer passed into read");
- }
- int read = in.read(buffer, 0, Math.min(buffer.length, dst.limit() - dst.position()));
- if (read > 0)
- {
- dst.put(buffer, 0, read);
- }
- return read;
- }
-
- public int write(ByteBuffer dst) throws IOException
- {
- byte[] data = new byte[dst.remaining()];
- dst.get(data);
- out.write(data);
- return data.length;
- }
-
- public boolean isOpen()
- {
- return socket.isConnected();
- }
-
- public void close() throws IOException
- {
- socket.close();
- }
-}
diff --git a/java/common/src/org/apache/qpid/bio/SocketAcceptor.java b/java/common/src/org/apache/qpid/bio/SocketAcceptor.java
deleted file mode 100644
index d47a29a047..0000000000
--- a/java/common/src/org/apache/qpid/bio/SocketAcceptor.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * @(#) $Id: SocketAcceptor.java 389042 2006-03-27 07:49:41Z trustin $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.bio;
-
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.ServerSocketChannel;
-import java.util.*;
-
-/**
- */
-public class SocketAcceptor extends BaseIoAcceptor
-{
- private static final Sequence acceptorSeq = new Sequence();
-
- private final int id = acceptorSeq.nextId();
- private final String threadName = "SocketAcceptor-" + id;
- private final IoServiceConfig defaultConfig = new SocketAcceptorConfig();
- private final Map services = new HashMap();//SocketAddress => SocketBinding
-
- public SocketAcceptor()
- {
- }
-
- /**
- * Binds to the specified <code>address</code> and handles incoming connections with the specified
- * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
- *
- * @throws IOException if failed to bind
- */
- public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
-
- if (handler == null)
- {
- throw new NullPointerException("handler");
- }
-
- if (!(address instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected address type: " + address.getClass());
- }
-
- if (((InetSocketAddress) address).getPort() == 0)
- {
- throw new IllegalArgumentException("Unsupported port number: 0");
- }
-
- if (config == null)
- {
- config = getDefaultConfig();
- }
-
- SocketBinding service = new SocketBinding(address, handler, config);
- synchronized (services)
- {
- services.put(address, service);
- }
- service.start();
- }
-
- public Set getManagedSessions(SocketAddress address)
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
-
- SocketBinding service = (SocketBinding) services.get(address);
-
- if (service == null)
- {
- throw new IllegalArgumentException("Address not bound: " + address);
- }
-
- return Collections.unmodifiableSet(new HashSet(service.sessions));
- }
-
- public void unbind(SocketAddress address)
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
-
- SocketBinding service;
- synchronized (services)
- {
- service = (SocketBinding) services.remove(address);
- }
-
- if (service == null)
- {
- throw new IllegalArgumentException("Address not bound: " + address);
- }
-
- try
- {
- service.unbind();
- }
- catch (IOException e)
- {
- //TODO: handle properly
- e.printStackTrace();
- }
- }
-
- public void unbindAll()
- {
- synchronized (services)
- {
- for (Iterator i = services.entrySet().iterator(); i.hasNext();)
- {
- SocketBinding service = (SocketBinding) i.next();
- try
- {
- service.unbind();
- }
- catch (IOException e)
- {
- //TODO: handle properly
- e.printStackTrace();
- }
- i.remove();
- }
- }
- }
-
- public boolean isBound(SocketAddress address)
- {
- synchronized (services)
- {
- return services.containsKey(address);
- }
- }
-
- public Set getBoundAddresses()
- {
- throw new UnsupportedOperationException("getBoundAddresses() not supported by blocking IO Acceptor");
- }
-
- public IoServiceConfig getDefaultConfig()
- {
- return defaultConfig;
- }
-
- private class SocketBinding implements Runnable
- {
- private final SocketAddress address;
- private final ServerSocketChannel service;
- //private final ServerSocket service;
- private final IoServiceConfig config;
- private final IoHandler handler;
- private final List sessions = new ArrayList();
- private volatile boolean stopped = false;
- private Thread runner;
-
- SocketBinding(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException
- {
- this.address = address;
- this.handler = handler;
- this.config = config;
-
- service = ServerSocketChannel.open();
- service.socket().bind(address);
-
- //service = new ServerSocket();
- //service.bind(address);
- }
-
- void unbind() throws IOException
- {
- stopped = true;
- //shutdown all sessions
- for (Iterator i = sessions.iterator(); i.hasNext();)
- {
- ((SocketSessionImpl) i.next()).close();
- i.remove();
- }
-
- //close server socket
- service.close();
- if (runner != null)
- {
- try
- {
- runner.join();
- }
- catch (InterruptedException e)
- {
- //ignore and return
- System.err.println("Warning: interrupted on unbind(" + address + ")");
- }
- }
- }
-
- void start()
- {
- runner = new Thread(this);
- runner.start();
- }
-
- public void run()
- {
- while (!stopped)
- {
- try
- {
- accept();
- }
- catch (Exception e)
- {
- //handle this better...
- e.printStackTrace();
- }
- }
- }
-
- private void accept() throws Exception
- {
- //accept(new SimpleSocketChannel(service.accept()));
- accept(service.accept());
- }
-
- private void accept(ByteChannel channel) throws Exception
- {
- //SocketChannel channel;
- //start session
- SocketSessionImpl session = new SocketSessionImpl(SocketAcceptor.this,
- (SocketSessionConfig) defaultConfig.getSessionConfig(),
- handler,
- channel,
- address);
- //signal start etc...
- sessions.add(session);
-
- //TODO
- //need to set up filter chains somehow... (this is copied from connector...)
- getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getThreadModel().buildFilterChain(session.getFilterChain());
- ((SocketFilterChain) session.getFilterChain()).sessionCreated(session);
-
- session.start();
- //not sure if this will work... socket is already opened before the created callback is called...
- ((SocketFilterChain) session.getFilterChain()).sessionOpened(session);
- }
- }
-}
diff --git a/java/common/src/org/apache/qpid/bio/SocketConnector.java b/java/common/src/org/apache/qpid/bio/SocketConnector.java
deleted file mode 100644
index b107c44726..0000000000
--- a/java/common/src/org/apache/qpid/bio/SocketConnector.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * @(#) $Id: SocketConnector.java 389042 2006-03-27 07:49:41Z trustin $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.bio;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.common.support.DefaultConnectFuture;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SocketChannel;
-
-/**
- */
-public class SocketConnector extends BaseIoConnector
-{
- /**
- * @noinspection StaticNonFinalField
- */
- private static final Sequence idSequence = new Sequence();
-
- private final Object lock = new Object();
- private final String threadName = "SocketConnector-" + idSequence.nextId();
- private final IoServiceConfig defaultConfig = new SocketConnectorConfig();
- private final Set managedSessions = Collections.synchronizedSet(new HashSet());
-
- /**
- * Create a connector with a single processing thread
- */
- public SocketConnector()
- {
- }
-
- public IoServiceConfig getDefaultConfig()
- {
- return defaultConfig;
- }
-
- public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
- {
- return connect(address, null, handler, config);
- }
-
- public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
- IoHandler handler, IoServiceConfig config)
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
- if (handler == null)
- {
- throw new NullPointerException("handler");
- }
-
- if (! (address instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected address type: " + address.getClass());
- }
- if (localAddress != null && !(localAddress instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected local address type: " + localAddress.getClass());
- }
- if (config == null)
- {
- config = getDefaultConfig();
- }
-
- DefaultConnectFuture future = new DefaultConnectFuture();
- try
- {
-
- //Socket socket = new Socket();
- //socket.connect(address);
- //SimpleSocketChannel channel = new SimpleSocketChannel(socket);
- //SocketAddress serviceAddress = socket.getRemoteSocketAddress();
-
- SocketChannel channel = SocketChannel.open(address);
- channel.configureBlocking(true);
- SocketAddress serviceAddress = channel.socket().getRemoteSocketAddress();
-
-
- SocketSessionImpl session = newSession(channel, handler, config, channel.socket().getRemoteSocketAddress());
- future.setSession(session);
- }
- catch (IOException e)
- {
- future.setException(e);
- }
-
- return future;
- }
-
- private SocketSessionImpl newSession(ByteChannel channel, IoHandler handler, IoServiceConfig config, SocketAddress serviceAddress)
- throws IOException
- {
- SocketSessionImpl session = new SocketSessionImpl(this,
- (SocketSessionConfig) config.getSessionConfig(),
- handler,
- channel,
- serviceAddress);
- try
- {
- getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getThreadModel().buildFilterChain(session.getFilterChain());
- ((SocketFilterChain) session.getFilterChain()).sessionCreated(session);
-
- session.start();
- //not sure if this will work... socket is already opened before the created callback is called...
- ((SocketFilterChain) session.getFilterChain()).sessionOpened(session);
- }
- catch (Throwable e)
- {
- throw (IOException) new IOException("Failed to create a session.").initCause(e);
- }
-
- //TODO: figure out how the managed session are used/ what they are etc.
- //session.getManagedSessions().add( session );
-
-
- return session;
- }
-}
diff --git a/java/common/src/org/apache/qpid/bio/SocketFilterChain.java b/java/common/src/org/apache/qpid/bio/SocketFilterChain.java
deleted file mode 100644
index f00a1535aa..0000000000
--- a/java/common/src/org/apache/qpid/bio/SocketFilterChain.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * @(#) $Id: SocketFilterChain.java 398039 2006-04-28 23:36:27Z proyal $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.bio;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-
-import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
-
-/**
- */
-class SocketFilterChain extends AbstractIoFilterChain
-{
-
- SocketFilterChain(IoSession parent)
- {
- super(parent);
- }
-
- protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception
- {
- SocketSessionImpl s = (SocketSessionImpl) session;
-
- //write to socket
- try
- {
- s.getChannel().write(((ByteBuffer) writeRequest.getMessage()).buf());
-
- //notify of completion
- writeRequest.getFuture().setWritten(true);
- }
- catch(ClosedByInterruptException e)
- {
- writeRequest.getFuture().setWritten(false);
- }
- }
-
- protected void doClose(IoSession session) throws IOException
- {
- SocketSessionImpl s = (SocketSessionImpl) session;
- s.shutdown();
- }
-}
diff --git a/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java b/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java
deleted file mode 100644
index d534093533..0000000000
--- a/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
- * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.bio;
-
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.TransportType;
-import org.apache.mina.common.support.BaseIoSession;
-import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfigImpl;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SocketChannel;
-
-/**
- */
-class SocketSessionImpl extends BaseIoSession
-{
- private final IoService manager;
- private final SocketSessionConfig config;
- private final SocketFilterChain filterChain;
- private final IoHandler handler;
- private final SocketAddress remoteAddress;
- private final SocketAddress localAddress;
- private final SocketAddress serviceAddress;
- private final Socket socket;
- private final ByteChannel channel;
- private final Reader reader;
- private Thread runner;
- private int readBufferSize;
-
- /**
- * Creates a new instance.
- */
- SocketSessionImpl(IoService manager,
- SocketSessionConfig config,
- IoHandler handler,
- ByteChannel channel,
- SocketAddress serviceAddress) throws IOException
- {
- this.manager = manager;
- this.filterChain = new SocketFilterChain(this);
- this.handler = handler;
- this.channel = channel;
- if(channel instanceof SocketChannel)
- {
- socket = ((SocketChannel) channel).socket();
- }
- else if(channel instanceof SimpleSocketChannel)
- {
- socket = ((SimpleSocketChannel) channel).socket();
- }
- else
- {
- throw new IllegalArgumentException("Unrecognised channel type: " + channel.getClass());
- }
-
- this.remoteAddress = socket.getRemoteSocketAddress();
- this.localAddress = socket.getLocalSocketAddress();
- this.serviceAddress = serviceAddress;
-
- this.config = new SessionConfigImpl(config);
-
- reader = new Reader(handler, this);
- }
-
- void start()
- {
- //create & start thread for this...
- runner = new Thread(reader);
- runner.start();
- }
-
- void shutdown() throws IOException
- {
- filterChain.sessionClosed( this );
- reader.stop();
- channel.close();
- }
-
- ByteChannel getChannel()
- {
- return channel;
- }
-
- protected void write0(WriteRequest writeRequest)
- {
- filterChain.filterWrite(this, writeRequest);
- }
-
- protected void close0()
- {
- filterChain.filterClose(this);
- super.close0();
- }
-
- protected void updateTrafficMask()
- {
- //TODO
- }
-
- public IoService getService()
- {
- return manager;
- }
-
- public IoSessionConfig getConfig()
- {
- return config;
- }
-
- public IoFilterChain getFilterChain()
- {
- return filterChain;
- }
-
- public IoHandler getHandler()
- {
- return handler;
- }
-
- public int getScheduledWriteRequests()
- {
- return 0;
- }
-
- public int getScheduledWriteBytes()
- {
- return 0;
- }
-
- public TransportType getTransportType()
- {
- return TransportType.SOCKET;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return remoteAddress;
- }
-
- public SocketAddress getLocalAddress()
- {
- return localAddress;
- }
-
- public SocketAddress getServiceAddress()
- {
- return serviceAddress;
- }
-
- int getReadBufferSize()
- {
- return readBufferSize;
- }
-
- private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
- {
- SessionConfigImpl()
- {
- }
-
- SessionConfigImpl(SocketSessionConfig cfg)
- {
- setKeepAlive(cfg.isKeepAlive());
- setOobInline(cfg.isOobInline());
- setReceiveBufferSize(cfg.getReceiveBufferSize());
- readBufferSize = cfg.getReceiveBufferSize();
- setReuseAddress(cfg.isReuseAddress());
- setSendBufferSize(cfg.getSendBufferSize());
- setSoLinger(cfg.getSoLinger());
- setTcpNoDelay(cfg.isTcpNoDelay());
- if (getTrafficClass() != cfg.getTrafficClass())
- {
- setTrafficClass(cfg.getTrafficClass());
- }
- }
-
-
- public boolean isKeepAlive()
- {
- try
- {
- return socket.getKeepAlive();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public void setKeepAlive(boolean on)
- {
- try
- {
- socket.setKeepAlive(on);
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public boolean isOobInline()
- {
- try
- {
- return socket.getOOBInline();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public void setOobInline(boolean on)
- {
- try
- {
- socket.setOOBInline(on);
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public boolean isReuseAddress()
- {
- try
- {
- return socket.getReuseAddress();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public void setReuseAddress(boolean on)
- {
- try
- {
- socket.setReuseAddress(on);
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public int getSoLinger()
- {
- try
- {
- return socket.getSoLinger();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public void setSoLinger(int linger)
- {
- try
- {
- if (linger < 0)
- {
- socket.setSoLinger(false, 0);
- }
- else
- {
- socket.setSoLinger(true, linger);
- }
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public boolean isTcpNoDelay()
- {
- try
- {
- return socket.getTcpNoDelay();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public void setTcpNoDelay(boolean on)
- {
- try
- {
- socket.setTcpNoDelay(on);
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public int getTrafficClass()
- {
- if (SocketSessionConfigImpl.isGetTrafficClassAvailable())
- {
- try
- {
- return socket.getTrafficClass();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
- else
- {
- return 0;
- }
- }
-
- public void setTrafficClass(int tc)
- {
- if (SocketSessionConfigImpl.isSetTrafficClassAvailable())
- {
- try
- {
- socket.setTrafficClass(tc);
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
- }
-
- public int getSendBufferSize()
- {
- try
- {
- return socket.getSendBufferSize();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public void setSendBufferSize(int size)
- {
- if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable())
- {
- try
- {
- socket.setSendBufferSize(size);
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
- }
-
- public int getReceiveBufferSize()
- {
- try
- {
- return socket.getReceiveBufferSize();
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
-
- public void setReceiveBufferSize(int size)
- {
- if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable())
- {
- try
- {
- socket.setReceiveBufferSize(size);
- SocketSessionImpl.this.readBufferSize = size;
- }
- catch (SocketException e)
- {
- throw new RuntimeIOException(e);
- }
- }
- }
- }
-}
diff --git a/java/common/src/org/apache/qpid/nio/SocketAcceptor.java b/java/common/src/org/apache/qpid/nio/SocketAcceptor.java
deleted file mode 100644
index 1033d16191..0000000000
--- a/java/common/src/org/apache/qpid/nio/SocketAcceptor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.nio;
-
-import org.apache.mina.common.support.DelegatedIoAcceptor;
-
-public class SocketAcceptor extends DelegatedIoAcceptor
-{
- /**
- * Creates a new instance.
- */
- public SocketAcceptor()
- {
- init(new SocketAcceptorDelegate(this));
- }
-}
diff --git a/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java b/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java
deleted file mode 100644
index 7339482ac0..0000000000
--- a/java/common/src/org/apache/qpid/nio/SocketAcceptorDelegate.java
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * @(#) $Id: SocketAcceptorDelegate.java 379346 2006-02-21 05:10:30Z trustin $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.nio;
-
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.IdentityHashSet;
-import org.apache.mina.util.Queue;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.*;
-
-/**
- * {@link IoAcceptor} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev: 379346 $, $Date: 2006-02-21 05:10:30 +0000 (Tue, 21 Feb 2006) $
- */
-public class SocketAcceptorDelegate extends BaseIoAcceptor
-{
- private static volatile int nextId = 0;
-
- private final IoAcceptor wrapper;
- private final int id = nextId ++;
- private final String threadName = "SocketAcceptor-" + id;
- private final IoServiceConfig defaultConfig = new SocketAcceptorConfig();
- private Selector selector;
- private final Map channels = new HashMap();
- private final Hashtable sessions = new Hashtable();
-
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
-
- private Worker worker;
-
- /**
- * Creates a new instance.
- */
- public SocketAcceptorDelegate(IoAcceptor wrapper)
- {
- this.wrapper = wrapper;
- }
-
- /**
- * Binds to the specified <code>address</code> and handles incoming
- * connections with the specified <code>handler</code>. Backlog value
- * is configured to the value of <code>backlog</code> property.
- *
- * @throws IOException if failed to bind
- */
- public void bind(SocketAddress address, IoHandler handler, IoServiceConfig config) throws IOException
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
-
- if (handler == null)
- {
- throw new NullPointerException("handler");
- }
-
- if (!(address instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected address type: " + address.getClass());
- }
-
- if (((InetSocketAddress) address).getPort() == 0)
- {
- throw new IllegalArgumentException("Unsupported port number: 0");
- }
-
- if (config == null)
- {
- config = getDefaultConfig();
- }
-
- RegistrationRequest request = new RegistrationRequest(address, handler, config);
-
- synchronized (this)
- {
- synchronized (registerQueue)
- {
- registerQueue.push(request);
- }
- startupWorker();
- }
-
- selector.wakeup();
-
- synchronized (request)
- {
- while (!request.done)
- {
- try
- {
- request.wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
-
- if (request.exception != null)
- {
- throw request.exception;
- }
- }
-
-
- private synchronized void startupWorker() throws IOException
- {
- if (worker == null)
- {
- selector = Selector.open();
- worker = new Worker();
-
- worker.start();
- }
- }
-
- public Set getManagedSessions(SocketAddress address)
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
-
- Set managedSessions = (Set) sessions.get(address);
-
- if (managedSessions == null)
- {
- throw new IllegalArgumentException("Address not bound: " + address);
- }
-
- return Collections.unmodifiableSet(
- new IdentityHashSet(Arrays.asList(managedSessions.toArray())));
- }
-
- public void unbind(SocketAddress address)
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
-
- final Set managedSessions = (Set) sessions.get(address);
- CancellationRequest request = new CancellationRequest(address);
- synchronized (this)
- {
- try
- {
- startupWorker();
- }
- catch (IOException e)
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply throw
- // IllegalArgumentException here because we can simply
- // conclude that nothing is bound to the selector.
- throw new IllegalArgumentException("Address not bound: " + address);
- }
-
- synchronized (cancelQueue)
- {
- cancelQueue.push(request);
- }
- }
-
- selector.wakeup();
-
- synchronized (request)
- {
- while (!request.done)
- {
- try
- {
- request.wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
-
- if (request.exception != null)
- {
- request.exception.fillInStackTrace();
-
- throw request.exception;
- }
-
- // Disconnect all clients
- IoServiceConfig cfg = request.registrationRequest.config;
- boolean disconnectOnUnbind;
- if (cfg instanceof IoAcceptorConfig)
- {
- disconnectOnUnbind = ((IoAcceptorConfig) cfg).isDisconnectOnUnbind();
- }
- else
- {
- disconnectOnUnbind = ((IoAcceptorConfig) getDefaultConfig()).isDisconnectOnUnbind();
- }
-
- if (disconnectOnUnbind && managedSessions != null)
- {
- IoSession[] tempSessions = (IoSession[])
- managedSessions.toArray(new IoSession[ 0 ]);
-
- final Object lock = new Object();
-
- for (int i = 0; i < tempSessions.length; i++)
- {
- if (!managedSessions.contains(tempSessions[i]))
- {
- // The session has already been closed and have been
- // removed from managedSessions by the SocketIoProcessor.
- continue;
- }
- tempSessions[i].close().setCallback(new IoFuture.Callback()
- {
- public void operationComplete(IoFuture future)
- {
- synchronized (lock)
- {
- lock.notify();
- }
- }
- });
- }
-
- try
- {
- synchronized (lock)
- {
- while (!managedSessions.isEmpty())
- {
- lock.wait(1000);
- }
- }
- }
- catch (InterruptedException ie)
- {
- // Ignored
- }
- }
- }
-
- public void unbindAll()
- {
- List addresses;
- synchronized (channels)
- {
- addresses = new ArrayList(channels.keySet());
- }
-
- for (Iterator i = addresses.iterator(); i.hasNext();)
- {
- unbind((SocketAddress) i.next());
- }
- }
-
- public boolean isBound(SocketAddress address)
- {
- synchronized (channels)
- {
- return channels.containsKey(address);
- }
- }
-
- public Set getBoundAddresses()
- {
- return wrapper.getBoundAddresses();
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super(SocketAcceptorDelegate.this.threadName);
- }
-
- public void run()
- {
- for (; ;)
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
- cancelKeys();
-
- if (nKeys > 0)
- {
- processSessions(selector.selectedKeys());
- }
-
- if (selector.keys().isEmpty())
- {
- synchronized (SocketAcceptorDelegate.this)
- {
- if (selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty())
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1)
- {
- }
- }
- }
- }
-
- private void processSessions(Set keys) throws IOException
- {
- Iterator it = keys.iterator();
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
-
- it.remove();
-
- if (!key.isAcceptable())
- {
- continue;
- }
-
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
-
- SocketChannel ch = ssc.accept();
-
- if (ch == null)
- {
- continue;
- }
-
- boolean success = false;
- SocketSessionImpl session = null;
- try
- {
- RegistrationRequest req = (RegistrationRequest) key.attachment();
- session = new SocketSessionImpl(
- SocketAcceptorDelegate.this.wrapper,
- (Set) sessions.get(req.address),
- (SocketSessionConfig) req.config.getSessionConfig(),
- ch, req.handler,
- req.address);
- getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- req.config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- ((SocketFilterChain) session.getFilterChain()).sessionCreated(session);
- session.getManagedSessions().add(session);
- session.getIoProcessor().addNew(session);
- success = true;
- }
- catch (Throwable t)
- {
- ExceptionMonitor.getInstance().exceptionCaught(t);
- }
- finally
- {
- if (!success)
- {
- if (session != null)
- {
- session.getManagedSessions().remove(session);
- }
- ch.close();
- }
- }
- }
- }
- }
-
- public IoServiceConfig getDefaultConfig()
- {
- return defaultConfig;
- }
-
- private void registerNew()
- {
- if (registerQueue.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- RegistrationRequest req;
-
- synchronized (registerQueue)
- {
- req = (RegistrationRequest) registerQueue.pop();
- }
-
- if (req == null)
- {
- break;
- }
-
- ServerSocketChannel ssc = null;
-
- try
- {
- ssc = ServerSocketChannel.open();
- ssc.configureBlocking(false);
-
- // Configure the server socket,
- SocketAcceptorConfig cfg;
- if (req.config instanceof SocketAcceptorConfig)
- {
- cfg = (SocketAcceptorConfig) req.config;
- }
- else
- {
- cfg = (SocketAcceptorConfig) getDefaultConfig();
- }
-
- ssc.socket().setReuseAddress(cfg.isReuseAddress());
- ssc.socket().setReceiveBufferSize(
- ((SocketSessionConfig) cfg.getSessionConfig()).getReceiveBufferSize());
-
- // and bind.
- ssc.socket().bind(req.address, cfg.getBacklog());
- ssc.register(selector, SelectionKey.OP_ACCEPT, req);
-
- synchronized (channels)
- {
- channels.put(req.address, ssc);
- }
- sessions.put(req.address, Collections.synchronizedSet(new HashSet()));
- }
- catch (IOException e)
- {
- req.exception = e;
- }
- finally
- {
- synchronized (req)
- {
- req.done = true;
-
- req.notify();
- }
-
- if (ssc != null && req.exception != null)
- {
- try
- {
- ssc.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
- }
- }
-
-
- private void cancelKeys()
- {
- if (cancelQueue.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- CancellationRequest request;
-
- synchronized (cancelQueue)
- {
- request = (CancellationRequest) cancelQueue.pop();
- }
-
- if (request == null)
- {
- break;
- }
-
- sessions.remove(request.address);
- ServerSocketChannel ssc;
- synchronized (channels)
- {
- ssc = (ServerSocketChannel) channels.remove(request.address);
- }
-
- // close the channel
- try
- {
- if (ssc == null)
- {
- request.exception = new IllegalArgumentException("Address not bound: " + request.address);
- }
- else
- {
- SelectionKey key = ssc.keyFor(selector);
- request.registrationRequest = (RegistrationRequest) key.attachment();
- key.cancel();
-
- selector.wakeup(); // wake up again to trigger thread death
-
- ssc.close();
- }
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- synchronized (request)
- {
- request.done = true;
- request.notify();
- }
- }
- }
- }
-
- private static class RegistrationRequest
- {
- private final SocketAddress address;
- private final IoHandler handler;
- private final IoServiceConfig config;
- private IOException exception;
- private boolean done;
-
- private RegistrationRequest(SocketAddress address, IoHandler handler, IoServiceConfig config)
- {
- this.address = address;
- this.handler = handler;
- this.config = config;
- }
- }
-
-
- private static class CancellationRequest
- {
- private final SocketAddress address;
- private boolean done;
- private RegistrationRequest registrationRequest;
- private RuntimeException exception;
-
- private CancellationRequest(SocketAddress address)
- {
- this.address = address;
- }
- }
-}
diff --git a/java/common/src/org/apache/qpid/nio/SocketConnector.java b/java/common/src/org/apache/qpid/nio/SocketConnector.java
deleted file mode 100644
index ce74fd0c96..0000000000
--- a/java/common/src/org/apache/qpid/nio/SocketConnector.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.nio;
-
-import org.apache.mina.common.support.DelegatedIoConnector;
-import org.apache.mina.transport.socket.nio.support.*;
-
-public class SocketConnector extends DelegatedIoConnector
-{
- /**
- * Creates a new instance.
- */
- public SocketConnector()
- {
- init(new SocketConnectorDelegate(this));
- }
-} \ No newline at end of file
diff --git a/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java b/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java
deleted file mode 100644
index 50c122d1c8..0000000000
--- a/java/common/src/org/apache/qpid/nio/SocketConnectorDelegate.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * @(#) $Id: SocketConnectorDelegate.java 379044 2006-02-20 07:40:37Z trustin $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.nio;
-
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.common.support.DefaultConnectFuture;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.util.Queue;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * {@link IoConnector} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev: 379044 $, $Date: 2006-02-20 07:40:37 +0000 (Mon, 20 Feb 2006) $
- */
-public class SocketConnectorDelegate extends BaseIoConnector
-{
- private static volatile int nextId = 0;
-
- private final IoConnector wrapper;
- private final int id = nextId++;
- private final String threadName = "SocketConnector-" + id;
- private final IoServiceConfig defaultConfig = new SocketConnectorConfig();
- private Selector selector;
- private final Queue connectQueue = new Queue();
- private final Set managedSessions = Collections.synchronizedSet(new HashSet());
- private Worker worker;
-
- /**
- * Creates a new instance.
- */
- public SocketConnectorDelegate(IoConnector wrapper)
- {
- this.wrapper = wrapper;
- }
-
- public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
- {
- return connect(address, null, handler, config);
- }
-
- public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
- IoHandler handler, IoServiceConfig config)
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
- if (handler == null)
- {
- throw new NullPointerException("handler");
- }
-
- if (! (address instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected address type: "
- + address.getClass());
- }
-
- if (localAddress != null && !(localAddress instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected local address type: "
- + localAddress.getClass());
- }
-
- if (config == null)
- {
- config = getDefaultConfig();
- }
-
- SocketChannel ch = null;
- boolean success = false;
- try
- {
- ch = SocketChannel.open();
- ch.socket().setReuseAddress(true);
- if (localAddress != null)
- {
- ch.socket().bind(localAddress);
- }
-
- ch.configureBlocking(false);
-
- if (ch.connect(address))
- {
- SocketSessionImpl session = newSession(ch, handler, config);
- success = true;
- ConnectFuture future = new DefaultConnectFuture();
- future.setSession(session);
- return future;
- }
-
- success = true;
- }
- catch (IOException e)
- {
- return DefaultConnectFuture.newFailedFuture(e);
- }
- finally
- {
- if (!success && ch != null)
- {
- try
- {
- ch.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
-
- ConnectionRequest request = new ConnectionRequest(ch, handler, config);
- synchronized (this)
- {
- try
- {
- startupWorker();
- }
- catch (IOException e)
- {
- try
- {
- ch.close();
- }
- catch (IOException e2)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e2);
- }
-
- return DefaultConnectFuture.newFailedFuture(e);
- }
- synchronized (connectQueue)
- {
- connectQueue.push(request);
- }
- selector.wakeup();
- }
-
- return request;
- }
-
- public IoServiceConfig getDefaultConfig()
- {
- return defaultConfig;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if (worker == null)
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- private void registerNew()
- {
- if (connectQueue.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- ConnectionRequest req;
- synchronized (connectQueue)
- {
- req = (ConnectionRequest) connectQueue.pop();
- }
-
- if (req == null)
- {
- break;
- }
-
- SocketChannel ch = req.channel;
- try
- {
- ch.register(selector, SelectionKey.OP_CONNECT, req);
- }
- catch (IOException e)
- {
- req.setException(e);
- }
- }
- }
-
- private void processSessions(Set keys)
- {
- Iterator it = keys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
-
- if (!key.isConnectable())
- {
- continue;
- }
-
- SocketChannel ch = (SocketChannel) key.channel();
- ConnectionRequest entry = (ConnectionRequest) key.attachment();
-
- boolean success = false;
- try
- {
- ch.finishConnect();
- SocketSessionImpl session = newSession(ch, entry.handler, entry.config);
- entry.setSession(session);
- success = true;
- }
- catch (Throwable e)
- {
- entry.setException(e);
- }
- finally
- {
- key.cancel();
- if (!success)
- {
- try
- {
- ch.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
- }
-
- keys.clear();
- }
-
- private void processTimedOutSessions(Set keys)
- {
- long currentTime = System.currentTimeMillis();
- Iterator it = keys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
-
- if (!key.isValid())
- {
- continue;
- }
-
- ConnectionRequest entry = (ConnectionRequest) key.attachment();
-
- if (currentTime >= entry.deadline)
- {
- entry.setException(new ConnectException());
- key.cancel();
- }
- }
- }
-
- private SocketSessionImpl newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config) throws IOException
- {
- SocketSessionImpl session = new SocketSessionImpl(
- wrapper, managedSessions,
- config.getSessionConfig(),
- ch, handler, ch.socket().getRemoteSocketAddress());
- try
- {
- getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- ((SocketFilterChain) session.getFilterChain()).sessionCreated(session);
- }
- catch (Throwable e)
- {
- throw (IOException) new IOException("Failed to create a session.").initCause(e);
- }
- session.getManagedSessions().add(session);
- session.getIoProcessor().addNew(session);
- return session;
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super(SocketConnectorDelegate.this.threadName);
- }
-
- public void run()
- {
- for (; ;)
- {
- try
- {
- int nKeys = selector.select(1000);
-
- registerNew();
-
- if (nKeys > 0)
- {
- processSessions(selector.selectedKeys());
- }
-
- processTimedOutSessions(selector.keys());
-
- if (selector.keys().isEmpty())
- {
- synchronized (SocketConnectorDelegate.this)
- {
- if (selector.keys().isEmpty() &&
- connectQueue.isEmpty())
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1)
- {
- }
- }
- }
- }
- }
-
- private class ConnectionRequest extends DefaultConnectFuture
- {
- private final SocketChannel channel;
- private final long deadline;
- private final IoHandler handler;
- private final IoServiceConfig config;
-
- private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
- {
- this.channel = channel;
- long timeout;
- if (config instanceof IoConnectorConfig)
- {
- timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
- }
- else
- {
- timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
- }
- this.deadline = System.currentTimeMillis() + timeout;
- this.handler = handler;
- this.config = config;
- }
- }
-} \ No newline at end of file
diff --git a/java/common/src/org/apache/qpid/nio/SocketFilterChain.java b/java/common/src/org/apache/qpid/nio/SocketFilterChain.java
deleted file mode 100644
index 07143512b5..0000000000
--- a/java/common/src/org/apache/qpid/nio/SocketFilterChain.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.qpid.nio;
-
-import java.io.IOException;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
-
-/**
- * An {@link IoFilterChain} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project
- */
-class SocketFilterChain extends AbstractIoFilterChain {
-
- public SocketFilterChain( IoSession parent )
- {
- super( parent );
- }
-
- protected void doWrite( IoSession session, WriteRequest writeRequest )
- {
- SocketSessionImpl s = ( SocketSessionImpl ) session;
- Queue writeRequestQueue = s.getWriteRequestQueue();
-
- // SocketIoProcessor.doFlush() will reset it after write is finished
- // because the buffer will be passed with messageSent event.
- ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.push( writeRequest );
- if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
- {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
- s.getIoProcessor().flush( s );
- }
- }
- }
-
- protected void doClose( IoSession session ) throws IOException
- {
- SocketSessionImpl s = ( SocketSessionImpl ) session;
- s.getIoProcessor().remove( s );
- }
-}
diff --git a/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java b/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java
deleted file mode 100644
index d20ab41b96..0000000000
--- a/java/common/src/org/apache/qpid/nio/SocketIoProcessor.java
+++ /dev/null
@@ -1,770 +0,0 @@
-/*
- * @(#) $Id: SocketIoProcessor.java 372449 2006-01-26 05:24:58Z trustin $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.nio;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.util.Queue;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * Performs all I/O operations for sockets which is connected or bound.
- * This class is used by MINA internally.
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev: 372449 $, $Date: 2006-01-26 05:24:58 +0000 (Thu, 26 Jan 2006) $,
- */
-class SocketIoProcessor
-{
- private static final Logger _logger = Logger.getLogger(SocketIoProcessor.class);
-
- private static final String PROCESSORS_PROPERTY = "mina.socket.processors";
- private static final String THREAD_PREFIX = "SocketIoProcessor-";
- private static final int DEFAULT_PROCESSORS = 1;
- private static final int PROCESSOR_COUNT;
- private static final SocketIoProcessor[] PROCESSORS;
-
- private static int nextId;
-
- static
- {
- PROCESSOR_COUNT = configureProcessorCount();
- PROCESSORS = createProcessors();
- }
-
- /**
- * Returns the {@link SocketIoProcessor} to be used for a newly
- * created session
- *
- * @return The processor to be employed
- */
- static synchronized SocketIoProcessor getInstance()
- {
- SocketIoProcessor processor = PROCESSORS[nextId ++];
- nextId %= PROCESSOR_COUNT;
- return processor;
- }
-
- private final String threadName;
- private Selector selector;
-
- private final Queue newSessions = new Queue();
- private final Queue removingSessions = new Queue();
- private final Queue flushingSessions = new Queue();
- private final Queue trafficControllingSessions = new Queue();
-
- private Worker worker;
- private long lastIdleCheckTime = System.currentTimeMillis();
-
- private SocketIoProcessor(String threadName)
- {
- this.threadName = threadName;
- }
-
- void addNew(SocketSessionImpl session) throws IOException
- {
- synchronized (this)
- {
- synchronized (newSessions)
- {
- newSessions.push(session);
- }
- startupWorker();
- }
-
- selector.wakeup();
- }
-
- void remove(SocketSessionImpl session) throws IOException
- {
- scheduleRemove(session);
- startupWorker();
- selector.wakeup();
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if (worker == null)
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- void flush(SocketSessionImpl session)
- {
- scheduleFlush(session);
- Selector selector = this.selector;
- if (selector != null)
- {
- selector.wakeup();
- }
- }
-
- void updateTrafficMask(SocketSessionImpl session)
- {
- scheduleTrafficControl(session);
- Selector selector = this.selector;
- if (selector != null)
- {
- selector.wakeup();
- }
- }
-
- private void scheduleRemove(SocketSessionImpl session)
- {
- synchronized (removingSessions)
- {
- removingSessions.push(session);
- }
- }
-
- private void scheduleFlush(SocketSessionImpl session)
- {
- synchronized (flushingSessions)
- {
- flushingSessions.push(session);
- }
- }
-
- private void scheduleTrafficControl(SocketSessionImpl session)
- {
- synchronized (trafficControllingSessions)
- {
- trafficControllingSessions.push(session);
- }
- }
-
- private void doAddNew()
- {
- if (newSessions.isEmpty())
- {
- return;
- }
-
- SocketSessionImpl session;
-
- for (; ;)
- {
- synchronized (newSessions)
- {
- session = (SocketSessionImpl) newSessions.pop();
- }
-
- if (session == null)
- {
- break;
- }
-
- SocketChannel ch = session.getChannel();
- boolean registered;
-
- try
- {
- ch.configureBlocking(false);
- session.setSelectionKey(ch.register(selector,
- SelectionKey.OP_READ,
- session));
- registered = true;
- }
- catch (IOException e)
- {
- session.getManagedSessions().remove(session);
- registered = false;
- ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e);
- }
-
- if (registered)
- {
- ((SocketFilterChain) session.getFilterChain()).sessionOpened(session);
- }
- }
- }
-
- private void doRemove()
- {
- if (removingSessions.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- SocketSessionImpl session;
-
- synchronized (removingSessions)
- {
- session = (SocketSessionImpl) removingSessions.pop();
- }
-
- if (session == null)
- {
- break;
- }
-
- SocketChannel ch = session.getChannel();
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.close() is called before addSession() is processed)
- if (key == null)
- {
- scheduleRemove(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid())
- {
- continue;
- }
-
- try
- {
- key.cancel();
- ch.close();
- }
- catch (IOException e)
- {
- ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e);
- }
- finally
- {
- releaseWriteBuffers(session);
- session.getManagedSessions().remove(session);
-
- ((SocketFilterChain) session.getFilterChain()).sessionClosed(session);
- session.getCloseFuture().setClosed();
- }
- }
- }
-
- private void process(Set selectedKeys)
- {
- Iterator it = selectedKeys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
- SocketSessionImpl session = (SocketSessionImpl) key.attachment();
-
- if (key.isReadable() && session.getTrafficMask().isReadable())
- {
- read(session);
- }
-
- if (key.isWritable() && session.getTrafficMask().isWritable())
- {
- scheduleFlush(session);
- }
- }
-
- selectedKeys.clear();
- }
-
- private void read(SocketSessionImpl session)
- {
- ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
- SocketChannel ch = session.getChannel();
-
- try
- {
- int readBytes = 0;
- int ret;
-
- buf.clear();
-
- try
- {
- while ((ret = ch.read(buf.buf())) > 0)
- {
- readBytes += ret;
- }
- }
- finally
- {
- buf.flip();
- }
-
- session.increaseReadBytes(readBytes);
-
- if (readBytes > 0)
- {
- /*ByteBuffer newBuf = ByteBuffer.allocate(readBytes);
- newBuf.put(buf);
- newBuf.flip();*/
- //((SocketFilterChain) session.getFilterChain()).messageReceived(session, newBuf);
- ((SocketFilterChain) session.getFilterChain()).messageReceived(session, buf);
- }
- if (ret < 0)
- {
- scheduleRemove(session);
- }
- }
- catch (Throwable e)
- {
- if (e instanceof IOException)
- {
- scheduleRemove(session);
- }
- buf.release();
- ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e);
- }
- /*finally
- {
- buf.release();
- } */
- }
-
- private void notifyIdleness()
- {
- // process idle sessions
- long currentTime = System.currentTimeMillis();
- if ((currentTime - lastIdleCheckTime) >= 1000)
- {
- lastIdleCheckTime = currentTime;
- Set keys = selector.keys();
- if (keys != null)
- {
- for (Iterator it = keys.iterator(); it.hasNext();)
- {
- SelectionKey key = (SelectionKey) it.next();
- SocketSessionImpl session = (SocketSessionImpl) key.attachment();
- notifyIdleness(session, currentTime);
- }
- }
- }
- }
-
- private void notifyIdleness(SocketSessionImpl session, long currentTime)
- {
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
- IdleStatus.BOTH_IDLE,
- Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
- IdleStatus.READER_IDLE,
- Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
- IdleStatus.WRITER_IDLE,
- Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
-
- notifyWriteTimeout(session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime());
- }
-
- private void notifyIdleness0(SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status,
- long lastIoTime)
- {
- if (idleTime > 0 && lastIoTime != 0
- && (currentTime - lastIoTime) >= idleTime)
- {
- session.increaseIdleCount(status);
- ((SocketFilterChain) session.getFilterChain()).sessionIdle(session, status);
- }
- }
-
- private void notifyWriteTimeout(SocketSessionImpl session,
- long currentTime,
- long writeTimeout, long lastIoTime)
- {
- SelectionKey key = session.getSelectionKey();
- if (writeTimeout > 0
- && (currentTime - lastIoTime) >= writeTimeout
- && key != null && key.isValid()
- && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
- {
- ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, new WriteTimeoutException());
- }
- }
-
- private void doFlush()
- {
- if (flushingSessions.size() == 0)
- {
- return;
- }
-
- for (; ;)
- {
- SocketSessionImpl session;
-
- synchronized (flushingSessions)
- {
- session = (SocketSessionImpl) flushingSessions.pop();
- }
-
- if (session == null)
- {
- break;
- }
-
- if (!session.isConnected())
- {
- releaseWriteBuffers(session);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.write() is called before addSession() is processed)
- if (key == null)
- {
- scheduleFlush(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid())
- {
- continue;
- }
-
- try
- {
- doFlush(session);
- }
- catch (IOException e)
- {
- scheduleRemove(session);
- ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e);
- }
- }
- }
-
- private void releaseWriteBuffers(SocketSessionImpl session)
- {
- Queue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
-
- while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
- {
- try
- {
- ((ByteBuffer) req.getMessage()).release();
- }
- catch (IllegalStateException e)
- {
- ((SocketFilterChain) session.getFilterChain()).exceptionCaught(session, e);
- }
- finally
- {
- req.getFuture().setWritten(false);
- }
- }
- }
-
- private void doFlush(SocketSessionImpl session) throws IOException
- {
- // Clear OP_WRITE
- SelectionKey key = session.getSelectionKey();
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- SocketChannel ch = session.getChannel();
- Queue writeRequestQueue = session.getWriteRequestQueue();
-
- WriteRequest req;
- for (; ;)
- {
- synchronized (writeRequestQueue)
- {
- req = (WriteRequest) writeRequestQueue.first();
- }
-
- if (req == null)
- {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0)
- {
- synchronized (writeRequestQueue)
- {
- writeRequestQueue.pop();
- }
-
- session.increaseWrittenWriteRequests();
- buf.reset();
- ((SocketFilterChain) session.getFilterChain()).messageSent(session, req);
- continue;
- }
-
- int writtenBytes = ch.write(buf.buf());
- if (writtenBytes > 0)
- {
- session.increaseWrittenBytes(writtenBytes);
- }
-
- if (buf.hasRemaining())
- {
- //_logger.info("Kernel buf full");
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- selector.wakeup();
- break;
- }
- }
- }
-
- private void doUpdateTrafficMask()
- {
- if (trafficControllingSessions.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- SocketSessionImpl session;
-
- synchronized (trafficControllingSessions)
- {
- session = (SocketSessionImpl) trafficControllingSessions.pop();
- }
-
- if (session == null)
- {
- break;
- }
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- if (key == null)
- {
- scheduleTrafficControl(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid())
- {
- continue;
- }
-
- // The normal is OP_READ and, if there are write requests in the
- // session's write queue, set OP_WRITE to trigger flushing.
- int ops = SelectionKey.OP_READ;
- Queue writeRequestQueue = session.getWriteRequestQueue();
- synchronized (writeRequestQueue)
- {
- if (!writeRequestQueue.isEmpty())
- {
- ops |= SelectionKey.OP_WRITE;
- }
- }
-
- // Now mask the preferred ops with the mask of the current session
- int mask = session.getTrafficMask().getInterestOps();
- key.interestOps(ops & mask);
- }
- }
-
- /**
- * Configures the number of processors employed.
- * We first check for a system property "mina.IoProcessors". If this
- * property is present and can be interpreted as an integer value greater
- * or equal to 1, this value is used as the number of processors.
- * Otherwise a default of 1 processor is employed.
- *
- * @return The nubmer of processors to employ
- */
- private static int configureProcessorCount()
- {
- int processors = DEFAULT_PROCESSORS;
- String processorProperty = System.getProperty(PROCESSORS_PROPERTY);
- if (processorProperty != null)
- {
- try
- {
- processors = Integer.parseInt(processorProperty);
- }
- catch (NumberFormatException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- processors = Math.max(processors, 1);
-
- System.setProperty(PROCESSORS_PROPERTY, String.valueOf(processors));
- }
-
- return processors;
- }
-
- private static SocketIoProcessor[] createProcessors()
- {
- SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ];
- for (int i = 0; i < PROCESSOR_COUNT; i ++)
- {
- processors[i] = new SocketIoProcessor(THREAD_PREFIX + i);
- }
- return processors;
- }
-
- private class WorkerFlusher implements Runnable
- {
- private volatile boolean _shutdown = false;
-
- private volatile boolean _sleep = false;
-
- private final Object _lock = new Object();
-
- public void run()
- {
- while (!_shutdown)
- {
- doFlush();
- try
- {
- sleep();
- }
- catch (InterruptedException e)
- {
- // IGNORE
- }
- }
- _logger.info("Flusher shutting down");
- }
-
- private void sleep() throws InterruptedException
- {
- synchronized (_lock)
- {
- while (_sleep && !_shutdown)
- {
- _logger.debug("Flusher going to sleep");
- _lock.wait();
- }
- _sleep = true;
- }
- }
-
- void wakeup()
- {
- synchronized (_lock)
- {
- if (_sleep)
- {
- _logger.debug("Waking up flusher");
- _sleep = false;
- _lock.notify();
- }
- }
- }
-
- void shutdown()
- {
- _shutdown = true;
- wakeup();
- }
- }
-
- private class Worker extends Thread
- {
- private WorkerFlusher _flusher;
-
- public Worker()
- {
- super(SocketIoProcessor.this.threadName);
- _flusher = new WorkerFlusher();
- new Thread(_flusher, SocketIoProcessor.this.threadName + "Flusher").start();
- }
-
- public void run()
- {
- for (; ;)
- {
- try
- {
- int nKeys = selector.select(1000);
- doAddNew();
- doUpdateTrafficMask();
-
- if (nKeys > 0)
- {
- process(selector.selectedKeys());
- }
-
- //doFlush();
- // in case the flusher has gone to sleep we wake it up
- if (flushingSessions.size() > 0)
- {
- _flusher.wakeup();
- }
- doRemove();
- notifyIdleness();
-
- if (selector.keys().isEmpty())
- {
- synchronized (SocketIoProcessor.this)
- {
- if (selector.keys().isEmpty() &&
- newSessions.isEmpty())
- {
- worker = null;
- _flusher.shutdown();
- try
- {
- selector.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch (Throwable t)
- {
- ExceptionMonitor.getInstance().exceptionCaught(t);
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1)
- {
- }
- }
- }
- }
- }
-
-}
diff --git a/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java b/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java
deleted file mode 100644
index f7c74f7a14..0000000000
--- a/java/common/src/org/apache/qpid/nio/SocketSessionImpl.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * @(#) $Id: SocketSessionImpl.java 385247 2006-03-12 05:06:11Z trustin $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.nio;
-
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.*;
-import org.apache.mina.common.support.BaseIoSession;
-import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.Queue;
-
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-
-/**
- * An {@link IoSession} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev: 385247 $, $Date: 2006-03-12 05:06:11 +0000 (Sun, 12 Mar 2006) $
- */
-class SocketSessionImpl extends BaseIoSession
-{
- private final IoService manager;
- private final SocketSessionConfig config = new SocketSessionConfigImpl();
- private final SocketIoProcessor ioProcessor;
- private final SocketFilterChain filterChain;
- private final SocketChannel ch;
- private final Queue writeRequestQueue;
- private final IoHandler handler;
- private final SocketAddress remoteAddress;
- private final SocketAddress localAddress;
- private final SocketAddress serviceAddress;
- private final Set managedSessions;
- private SelectionKey key;
- private int readBufferSize;
-
- /**
- * Creates a new instance.
- */
- public SocketSessionImpl(
- IoService manager, Set managedSessions,
- IoSessionConfig config,
- SocketChannel ch, IoHandler defaultHandler,
- SocketAddress serviceAddress )
- {
- this.manager = manager;
- this.managedSessions = managedSessions;
- this.ioProcessor = SocketIoProcessor.getInstance();
- this.filterChain = new SocketFilterChain( this );
- this.ch = ch;
- this.writeRequestQueue = new Queue();
- this.handler = defaultHandler;
- this.remoteAddress = ch.socket().getRemoteSocketAddress();
- this.localAddress = ch.socket().getLocalSocketAddress();
- this.serviceAddress = serviceAddress;
-
- // Apply the initial session settings
- if( config instanceof SocketSessionConfig )
- {
- SocketSessionConfig cfg = ( SocketSessionConfig ) config;
- this.config.setKeepAlive( cfg.isKeepAlive() );
- this.config.setOobInline( cfg.isOobInline() );
- this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() );
- this.readBufferSize = cfg.getReceiveBufferSize();
- this.config.setReuseAddress( cfg.isReuseAddress() );
- this.config.setSendBufferSize( cfg.getSendBufferSize() );
- this.config.setSoLinger( cfg.getSoLinger() );
- this.config.setTcpNoDelay( cfg.isTcpNoDelay() );
-
- if( this.config.getTrafficClass() != cfg.getTrafficClass() )
- {
- this.config.setTrafficClass( cfg.getTrafficClass() );
- }
- }
- }
-
- public IoService getService()
- {
- return manager;
- }
-
- public IoSessionConfig getConfig()
- {
- return config;
- }
-
- SocketIoProcessor getIoProcessor()
- {
- return ioProcessor;
- }
-
- public IoFilterChain getFilterChain()
- {
- return filterChain;
- }
-
- SocketChannel getChannel()
- {
- return ch;
- }
-
- Set getManagedSessions()
- {
- return managedSessions;
- }
-
- SelectionKey getSelectionKey()
- {
- return key;
- }
-
- void setSelectionKey( SelectionKey key )
- {
- this.key = key;
- }
-
- public IoHandler getHandler()
- {
- return handler;
- }
-
- protected void close0()
- {
- filterChain.filterClose( this );
- }
-
- Queue getWriteRequestQueue()
- {
- return writeRequestQueue;
- }
-
- public int getScheduledWriteRequests()
- {
- synchronized( writeRequestQueue )
- {
- return writeRequestQueue.size();
- }
- }
-
- public int getScheduledWriteBytes()
- {
- synchronized( writeRequestQueue )
- {
- return writeRequestQueue.byteSize();
- }
- }
-
- protected void write0( WriteRequest writeRequest )
- {
- filterChain.filterWrite( this, writeRequest );
- }
-
- public TransportType getTransportType()
- {
- return TransportType.SOCKET;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return remoteAddress;
- }
-
- public SocketAddress getLocalAddress()
- {
- return localAddress;
- }
-
- public SocketAddress getServiceAddress()
- {
- return serviceAddress;
- }
-
- protected void updateTrafficMask()
- {
- this.ioProcessor.updateTrafficMask( this );
- }
-
- int getReadBufferSize()
- {
- return readBufferSize;
- }
-
- private class SocketSessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
- {
- public boolean isKeepAlive()
- {
- try
- {
- return ch.socket().getKeepAlive();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setKeepAlive( boolean on )
- {
- try
- {
- ch.socket().setKeepAlive( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public boolean isOobInline()
- {
- try
- {
- return ch.socket().getOOBInline();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setOobInline( boolean on )
- {
- try
- {
- ch.socket().setOOBInline( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public boolean isReuseAddress()
- {
- try
- {
- return ch.socket().getReuseAddress();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setReuseAddress( boolean on )
- {
- try
- {
- ch.socket().setReuseAddress( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public int getSoLinger()
- {
- try
- {
- return ch.socket().getSoLinger();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setSoLinger( int linger )
- {
- try
- {
- if( linger < 0 )
- {
- ch.socket().setSoLinger( false, 0 );
- }
- else
- {
- ch.socket().setSoLinger( true, linger );
- }
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public boolean isTcpNoDelay()
- {
- try
- {
- return ch.socket().getTcpNoDelay();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setTcpNoDelay( boolean on )
- {
- try
- {
- ch.socket().setTcpNoDelay( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public int getTrafficClass()
- {
- try
- {
- return ch.socket().getTrafficClass();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setTrafficClass( int tc )
- {
- try
- {
- ch.socket().setTrafficClass( tc );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public int getSendBufferSize()
- {
- try
- {
- return ch.socket().getSendBufferSize();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setSendBufferSize( int size )
- {
- try
- {
- ch.socket().setSendBufferSize( size );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public int getReceiveBufferSize()
- {
- try
- {
- return ch.socket().getReceiveBufferSize();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setReceiveBufferSize( int size )
- {
- try
- {
- ch.socket().setReceiveBufferSize( size );
- SocketSessionImpl.this.readBufferSize = size;
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
- }
-}
diff --git a/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java
index 5de2bf8d0e..bed9ac0b99 100644
--- a/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java
+++ b/java/common/src/org/apache/qpid/pool/ReadWriteThreadModel.java
@@ -18,7 +18,7 @@
package org.apache.qpid.pool;
import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.ReferenceCountingIoFilter;
+import org.apache.mina.filter.ReferenceCountingIoFilter;
import org.apache.mina.common.ThreadModel;
public class ReadWriteThreadModel implements ThreadModel