summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-07 15:08:44 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-07 15:08:44 +0000
commitbfd022e2307b6628ee63e316f042ffb9b85300f7 (patch)
tree8c15d0ed1d363e0c44f9698ce3b6852fe71743b4
parent122b2d411f119e4b46b77f20dc5002981db204a8 (diff)
downloadqpid-python-bfd022e2307b6628ee63e316f042ffb9b85300f7.tar.gz
QPID-3341: remove unused/dead transport code and accompanying implementation classes
Applied patch by Keith Wall and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1143865 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/etc/config.xml11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java39
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java65
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java15
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java17
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java272
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java547
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java486
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java67
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java1026
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java240
-rw-r--r--qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java488
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java208
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java108
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java154
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java29
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java130
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java85
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java81
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java274
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java90
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java135
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java126
-rw-r--r--qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java396
-rw-r--r--qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java157
-rw-r--r--qpid/java/systests/etc/config-systests-firewall-2.xml11
-rw-r--r--qpid/java/systests/etc/config-systests-firewall-3.xml11
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java15
30 files changed, 26 insertions, 5316 deletions
diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml
index c0f9b4df61..f4758d77a8 100644
--- a/qpid/java/broker/etc/config.xml
+++ b/qpid/java/broker/etc/config.xml
@@ -37,17 +37,10 @@
<keystorePath>/path/to/keystore.ks</keystorePath>
<keystorePassword>keystorepass</keystorePassword>
</ssl>
- <qpidnio>false</qpidnio>
- <protectio>
- <enabled>false</enabled>
- <readBufferLimitSize>262144</readBufferLimitSize>
- <writeBufferLimitSize>262144</writeBufferLimitSize>
- </protectio>
- <transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
- <socketReceiveBuffer>32768</socketReceiveBuffer>
- <socketSendBuffer>32768</socketSendBuffer>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>true</enabled>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index f152865a27..5908eb4bd8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -52,9 +52,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class);
// Default Configuration values
- public static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144;
- public static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144;
- public static final boolean DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED = false;
+ public static final int DEFAULT_BUFFER_SIZE = 262144;
public static final String DEFAULT_STATUS_UPDATES = "on";
public static final String SECURITY_CONFIG_RELOADED = "SECURITY CONFIGURATION RELOADED";
@@ -84,9 +82,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
// Configuration values to be read from the configuration file
//todo Move all properties to static values to ensure system testing can be performed.
- public static final String CONNECTOR_PROTECTIO_ENABLED = "connector.protectio.enabled";
- public static final String CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE = "connector.protectio.readBufferLimitSize";
- public static final String CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE = "connector.protectio.writeBufferLimitSize";
public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket";
public static final String STATUS_UPDATES = "status-updates";
public static final String ADVANCED_LOCALE = "advanced.locale";
@@ -95,7 +90,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
envVarMap.put("QPID_PORT", "connector.port");
envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers");
envVarMap.put("QPID_SSLPORT", "connector.ssl.port");
- envVarMap.put("QPID_NIO", "connector.qpidnio");
envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool");
envVarMap.put("QPID_JMXPORT", "management.jmxport");
envVarMap.put("QPID_FRAMESIZE", "advanced.framesize");
@@ -545,21 +539,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
return getIntValue("advanced.framesize", DEFAULT_FRAME_SIZE);
}
- public boolean getProtectIOEnabled()
- {
- return getBooleanValue(CONNECTOR_PROTECTIO_ENABLED, DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED);
- }
-
- public int getBufferReadLimit()
- {
- return getIntValue(CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_READ_LIMIT_SIZE);
- }
-
- public int getBufferWriteLimit()
- {
- return getIntValue(CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_WRITE_LIMIT_SIZE);
- }
-
public boolean getSynchedClocks()
{
return getBooleanValue("advanced.synced-clocks");
@@ -687,12 +666,12 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public int getReceiveBufferSize()
{
- return getIntValue("connector.socketReceiveBuffer", 32767);
+ return getIntValue("connector.socketReceiveBuffer", DEFAULT_BUFFER_SIZE);
}
public int getWriteBufferSize()
{
- return getIntValue("connector.socketWriteBuffer", 32767);
+ return getIntValue("connector.socketWriteBuffer", DEFAULT_BUFFER_SIZE);
}
public boolean getTcpNoDelay()
@@ -735,11 +714,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
return getStringValue("connector.ssl.certType", "SunX509");
}
- public boolean getQpidNIO()
- {
- return getBooleanValue("connector.qpidnio");
- }
-
public boolean getUseBiasedWrites()
{
return getBooleanValue("advanced.useWriteBiasedPool");
@@ -809,8 +783,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public Boolean getTcpNoDelay()
{
- // Can't call parent getTcpNoDelay since it just calls this one
- return getBooleanValue("connector.tcpNoDelay", true);
+ return ServerConfiguration.this.getTcpNoDelay();
}
public Integer getSoTimeout()
@@ -825,7 +798,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public Integer getSendBufferSize()
{
- return getBufferWriteLimit();
+ return ServerConfiguration.this.getWriteBufferSize();
}
public Boolean getReuseAddress()
@@ -835,7 +808,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public Integer getReceiveBufferSize()
{
- return getBufferReadLimit();
+ return ServerConfiguration.this.getReceiveBufferSize();
}
public Boolean getOOBInline()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 494003c8a0..484f93cb88 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -187,49 +187,6 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
assertEquals(23, serverConfig.getFrameSize());
}
- public void testGetProtectIOEnabled() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(false, serverConfig.getProtectIOEnabled());
-
- // Check value we set
- _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED, true);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(true, serverConfig.getProtectIOEnabled());
- }
-
- public void testGetBufferReadLimit() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(262144, serverConfig.getBufferReadLimit());
-
- // Check value we set
- _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, 23);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(23, serverConfig.getBufferReadLimit());
- }
-
- public void testGetBufferWriteLimit() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(262144, serverConfig.getBufferWriteLimit());
-
- // Check value we set
- _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, 23);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(23, serverConfig.getBufferWriteLimit());
- }
-
-
public void testGetStatusEnabled() throws ConfigurationException
{
// Check default
@@ -543,7 +500,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
serverConfig.initialise();
- assertEquals(32767, serverConfig.getReceiveBufferSize());
+ assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE, serverConfig.getReceiveBufferSize());
// Check value we set
_config.setProperty("connector.socketReceiveBuffer", "23");
@@ -557,7 +514,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
serverConfig.initialise();
- assertEquals(32767, serverConfig.getWriteBufferSize());
+ assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE, serverConfig.getWriteBufferSize());
// Check value we set
_config.setProperty("connector.socketWriteBuffer", "23");
@@ -678,20 +635,6 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
assertEquals("a", serverConfig.getCertType());
}
- public void testGetQpidNIO() throws ConfigurationException
- {
- // Check default
- ServerConfiguration serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(false, serverConfig.getQpidNIO());
-
- // Check value we set
- _config.setProperty("connector.qpidnio", true);
- serverConfig = new ServerConfiguration(_config);
- serverConfig.initialise();
- assertEquals(true, serverConfig.getQpidNIO());
- }
-
public void testGetUseBiasedWrites() throws ConfigurationException
{
// Check default
@@ -756,7 +699,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
out.close();
out = new FileWriter(fileB);
- out.write("<broker><connector><ssl><port>2345</port></ssl><qpidnio>true</qpidnio></connector></broker>");
+ out.write("<broker><connector><ssl><port>2345</port></ssl></connector></broker>");
out.close();
ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile());
@@ -767,8 +710,6 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
assertEquals(1, config.getPorts().size());
assertEquals("2342", config.getPorts().get(0)); // From the first file, not
// present in the second
- assertEquals(true, config.getQpidNIO()); // From the second file, not
- // present in the first
}
public void testVariableInterpolation() throws Exception
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 40b332d216..98573c9cc3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -89,15 +89,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
- // TODO: use system property thingy for this
- if (System.getProperty("UseTransportIo", "false").equals("false"))
- {
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- }
- else
- {
- _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
- }
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+
_conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index eb5af119b2..424e09693f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -211,21 +211,6 @@ public class AMQProtocolHandler implements ProtocolEngine
}
/**
- * Called when we want to create a new IoTransport session
- * @param brokerDetail
- */
- public void createIoTransportSession(BrokerDetails brokerDetail)
- {
- _protocolSession = new AMQProtocolSession(this, _connection);
- _stateManager.setProtocolSession(_protocolSession);
- IoTransport.connect_0_9(getProtocolSession(),
- brokerDetail.getHost(),
- brokerDetail.getPort(),
- brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
- _protocolSession.init();
- }
-
- /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index aef3a563af..97657a09f4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -30,7 +30,6 @@ import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -116,20 +115,8 @@ public class TransportConnection
{
public IoConnector newSocketConnector()
{
- SocketConnector result;
- // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
- {
- _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
- ? "Qpid NIO is new default"
- : "Sysproperty 'qpidnio' is set"));
- result = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
- }
- else
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
- }
+ SocketConnector result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
+
// Don't have the connector's worker thread wait around for other connections (we only use
// one SocketConnector per connection at the moment anyway). This allows short-running
// clients (like unit tests) to complete quickly.
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
deleted file mode 100644
index 47f19aa76d..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.mina.filter;
-
-import org.apache.mina.common.IoFilter;/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-public class WriteBufferFullExeception extends RuntimeException
-{
- private IoFilter.WriteRequest _writeRequest;
-
- public WriteBufferFullExeception()
- {
- this(null);
- }
-
- public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest)
- {
- _writeRequest = writeRequest;
- }
-
-
- public void setWriteRequest(IoFilter.WriteRequest writeRequest)
- {
- _writeRequest = writeRequest;
- }
-
- public IoFilter.WriteRequest getWriteRequest()
- {
- return _writeRequest;
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
deleted file mode 100644
index 4e9db9071a..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.filter;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.DefaultIoFilterChainBuilder;
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.executor.ExecutorFilter;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than
- * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the
- * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
- * cause an Out of Memory exception due to a back log of unprocessed messages.
- *
- * This is should only be viewed as a temporary work around for DIRMINA-302.
- *
- * A true solution should not be implemented as a filter as this issue will always occur. On a machine
- * where the network is slower than the local producer.
- *
- * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
- *
- * They could be:
- * Block - As this does
- * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
- * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state
- *
- * <p/>
- * <p>Usage:
- * <p/>
- * <pre><code>
- * DefaultFilterChainBuilder builder = ...
- * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
- * filter.attach( builder );
- * </code></pre>
- * <p/>
- * or
- * <p/>
- * <pre><code>
- * IoFilterChain chain = ...
- * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
- * filter.attach( chain );
- * </code></pre>
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-public class WriteBufferLimitFilterBuilder
-{
- public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize";
-
- private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000;
-
- private volatile boolean throwNotBlock = false;
-
- private volatile int maximumConnectionBufferCount;
- private volatile long maximumConnectionBufferSize;
-
- private final Object _blockLock = new Object();
-
- private int _blockWaiters = 0;
-
-
- public WriteBufferLimitFilterBuilder()
- {
- this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT);
- }
-
- public WriteBufferLimitFilterBuilder(int maxWriteBufferSize)
- {
- setMaximumConnectionBufferCount(maxWriteBufferSize);
- }
-
-
- /**
- * Set the maximum amount pending items in the writeQueue for a given session.
- * Changing the value will only take effect when new data is received for a
- * connection, including existing connections. Default value is 5000 msgs.
- *
- * @param maximumConnectionBufferCount New buffer size. Must be > 0
- */
- public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount)
- {
- this.maximumConnectionBufferCount = maximumConnectionBufferCount;
- this.maximumConnectionBufferSize = 0;
- }
-
- public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize)
- {
- this.maximumConnectionBufferSize = maximumConnectionBufferSize;
- this.maximumConnectionBufferCount = 0;
- }
-
- /**
- * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself
- * before and after that filter.
- *
- * @param chain {@link IoFilterChain} to attach self to.
- */
- public void attach(IoFilterChain chain)
- {
- String name = getThreadPoolFilterEntryName(chain.getAll());
-
- chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
- }
-
- /**
- * Attach this filter to the specified builder. It will search for the
- * {@link ExecutorFilter}, and attach itself before and after that filter.
- *
- * @param builder {@link DefaultIoFilterChainBuilder} to attach self to.
- */
- public void attach(DefaultIoFilterChainBuilder builder)
- {
- String name = getThreadPoolFilterEntryName(builder.getAll());
-
- builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit());
- }
-
- private String getThreadPoolFilterEntryName(List entries)
- {
- Iterator i = entries.iterator();
-
- while (i.hasNext())
- {
- IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next();
-
- if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class))
- {
- return entry.getName();
- }
- }
-
- throw new IllegalStateException("Chain does not contain a ExecutorFilter");
- }
-
-
- public class SendLimit extends IoFilterAdapter
- {
- public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception
- {
- try
- {
- waitTillSendAllowed(session);
- }
- catch (WriteBufferFullExeception wbfe)
- {
- nextFilter.exceptionCaught(session, wbfe);
- }
-
- if (writeRequest.getMessage() instanceof ByteBuffer)
- {
- increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage());
- }
-
- nextFilter.filterWrite(session, writeRequest);
- }
-
- private void increasePendingWriteSize(IoSession session, ByteBuffer message)
- {
- synchronized (session)
- {
- Long pendingSize = getScheduledWriteBytes(session) + message.remaining();
- session.setAttribute(PENDING_SIZE, pendingSize);
- }
- }
-
- private boolean sendAllowed(IoSession session)
- {
- if (session.isClosing())
- {
- return true;
- }
-
- int lmswm = maximumConnectionBufferCount;
- long lmswb = maximumConnectionBufferSize;
-
- return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm)
- && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb);
- }
-
- private long getScheduledWriteBytes(IoSession session)
- {
- synchronized (session)
- {
- Long i = (Long) session.getAttribute(PENDING_SIZE);
- return null == i ? 0 : i;
- }
- }
-
- private void waitTillSendAllowed(IoSession session)
- {
- synchronized (_blockLock)
- {
- if (throwNotBlock)
- {
- throw new WriteBufferFullExeception();
- }
-
- _blockWaiters++;
-
- while (!sendAllowed(session))
- {
- try
- {
- _blockLock.wait();
- }
- catch (InterruptedException e)
- {
- // Ignore.
- }
- }
- _blockWaiters--;
- }
- }
-
- public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
- {
- if (message instanceof ByteBuffer)
- {
- decrementPendingWriteSize(session, (ByteBuffer) message);
- }
- notifyWaitingWriters();
- nextFilter.messageSent(session, message);
- }
-
- private void decrementPendingWriteSize(IoSession session, ByteBuffer message)
- {
- synchronized (session)
- {
- session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining());
- }
- }
-
- private void notifyWaitingWriters()
- {
- synchronized (_blockLock)
- {
- if (_blockWaiters != 0)
- {
- _blockLock.notifyAll();
- }
- }
-
- }
-
- }//SentLimit
-
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
deleted file mode 100644
index e5360d32e0..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.NewThreadExecutor;
-import org.apache.mina.util.NamePreservingRunnable;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-
-/**
- * {@link IoAcceptor} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-public class MultiThreadSocketAcceptor extends SocketAcceptor
-{
- /**
- * @noinspection StaticNonFinalField
- */
- private static volatile int nextId = 0;
-
- private final Executor executor;
- private final Object lock = new Object();
- private final int id = nextId ++;
- private final String threadName = "SocketAcceptor-" + id;
- private final Map channels = new HashMap();
-
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
-
- private final MultiThreadSocketIoProcessor[] ioProcessors;
- private final int processorCount;
-
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
- private Selector selector;
- private Worker worker;
- private int processorDistributor = 0;
-
- /**
- * Create an acceptor with a single processing thread using a NewThreadExecutor
- */
- public MultiThreadSocketAcceptor()
- {
- this( 1, new NewThreadExecutor() );
- }
-
- /**
- * Create an acceptor with the desired number of processing threads
- *
- * @param processorCount Number of processing threads
- * @param executor Executor to use for launching threads
- */
- public MultiThreadSocketAcceptor( int processorCount, Executor executor )
- {
- if( processorCount < 1 )
- {
- throw new IllegalArgumentException( "Must have at least one processor" );
- }
-
- this.executor = executor;
- this.processorCount = processorCount;
- ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
-
- for( int i = 0; i < processorCount; i++ )
- {
- ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor );
- }
- }
-
-
- /**
- * Binds to the specified <code>address</code> and handles incoming connections with the specified
- * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
- *
- * @throws IOException if failed to bind
- */
- public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
- {
- if( handler == null )
- {
- throw new NullPointerException( "handler" );
- }
-
- if( address != null && !( address instanceof InetSocketAddress ) )
- {
- throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
- }
-
- if( config == null )
- {
- config = getDefaultConfig();
- }
-
- RegistrationRequest request = new RegistrationRequest( address, handler, config );
-
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
-
- startupWorker();
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
-
- if( request.exception != null )
- {
- throw request.exception;
- }
- }
-
-
- private synchronized void startupWorker() throws IOException
- {
- synchronized( lock )
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
-
- executor.execute( new NamePreservingRunnable( worker ) );
- }
- }
- }
-
- public void unbind( SocketAddress address )
- {
- if( address == null )
- {
- throw new NullPointerException( "address" );
- }
-
- CancellationRequest request = new CancellationRequest( address );
-
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply throw
- // IllegalArgumentException here because we can simply
- // conclude that nothing is bound to the selector.
- throw new IllegalArgumentException( "Address not bound: " + address );
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( request );
- }
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
-
- if( request.exception != null )
- {
- request.exception.fillInStackTrace();
-
- throw request.exception;
- }
- }
-
-
- private class Worker implements Runnable
- {
- public void run()
- {
- Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName );
-
- for( ; ; )
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
-
- if( nKeys > 0 )
- {
- processSessions( selector.selectedKeys() );
- }
-
- cancelKeys();
-
- if( selector.keys().isEmpty() )
- {
- synchronized( lock )
- {
- if( selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e1 );
- }
- }
- }
- }
-
- private void processSessions( Set keys ) throws IOException
- {
- Iterator it = keys.iterator();
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- it.remove();
-
- if( !key.isAcceptable() )
- {
- continue;
- }
-
- ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
-
- SocketChannel ch = ssc.accept();
-
- if( ch == null )
- {
- continue;
- }
-
- boolean success = false;
- try
- {
-
- RegistrationRequest req = ( RegistrationRequest ) key.attachment();
-
- MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl(
- MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(),
- req.config, ch, req.handler, req.address );
-
- // New Interface
-// SocketSessionImpl session = new SocketSessionImpl(
-// SocketAcceptor.this, nextProcessor(), getListeners(),
-// req.config, ch, req.handler, req.address );
-
-
- getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
- req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
- req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
- session.getIoProcessor().addNew( session );
- success = true;
- }
- catch( Throwable t )
- {
- ExceptionMonitor.getInstance().exceptionCaught( t );
- }
- finally
- {
- if( !success )
- {
- ch.close();
- }
- }
- }
- }
- }
-
- private MultiThreadSocketIoProcessor nextProcessor()
- {
- return ioProcessors[processorDistributor++ % processorCount];
- }
-
-
- private void registerNew()
- {
- if( registerQueue.isEmpty() )
- {
- return;
- }
-
- for( ; ; )
- {
- RegistrationRequest req;
-
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
-
- if( req == null )
- {
- break;
- }
-
- ServerSocketChannel ssc = null;
-
- try
- {
- ssc = ServerSocketChannel.open();
- ssc.configureBlocking( false );
-
- // Configure the server socket,
- SocketAcceptorConfig cfg;
- if( req.config instanceof SocketAcceptorConfig )
- {
- cfg = ( SocketAcceptorConfig ) req.config;
- }
- else
- {
- cfg = ( SocketAcceptorConfig ) getDefaultConfig();
- }
-
- ssc.socket().setReuseAddress( cfg.isReuseAddress() );
- ssc.socket().setReceiveBufferSize(
- ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
-
- // and bind.
- ssc.socket().bind( req.address, cfg.getBacklog() );
- if( req.address == null || req.address.getPort() == 0 )
- {
- req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress();
- }
- ssc.register( selector, SelectionKey.OP_ACCEPT, req );
-
- synchronized( channels )
- {
- channels.put( req.address, ssc );
- }
-
- getListeners().fireServiceActivated(
- this, req.address, req.handler, req.config );
- }
- catch( IOException e )
- {
- req.exception = e;
- }
- finally
- {
- synchronized( req )
- {
- req.done = true;
-
- req.notifyAll();
- }
-
- if( ssc != null && req.exception != null )
- {
- try
- {
- ssc.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
- }
- }
-
-
- private void cancelKeys()
- {
- if( cancelQueue.isEmpty() )
- {
- return;
- }
-
- for( ; ; )
- {
- CancellationRequest request;
-
- synchronized( cancelQueue )
- {
- request = ( CancellationRequest ) cancelQueue.pop();
- }
-
- if( request == null )
- {
- break;
- }
-
- ServerSocketChannel ssc;
- synchronized( channels )
- {
- ssc = ( ServerSocketChannel ) channels.remove( request.address );
- }
-
- // close the channel
- try
- {
- if( ssc == null )
- {
- request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
- }
- else
- {
- SelectionKey key = ssc.keyFor( selector );
- request.registrationRequest = ( RegistrationRequest ) key.attachment();
- key.cancel();
-
- selector.wakeup(); // wake up again to trigger thread death
-
- ssc.close();
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- synchronized( request )
- {
- request.done = true;
- request.notifyAll();
- }
-
- if( request.exception == null )
- {
- getListeners().fireServiceDeactivated(
- this, request.address,
- request.registrationRequest.handler,
- request.registrationRequest.config );
- }
- }
- }
- }
-
- private static class RegistrationRequest
- {
- private InetSocketAddress address;
- private final IoHandler handler;
- private final IoServiceConfig config;
- private IOException exception;
- private boolean done;
-
- private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
- {
- this.address = ( InetSocketAddress ) address;
- this.handler = handler;
- this.config = config;
- }
- }
-
-
- private static class CancellationRequest
- {
- private final SocketAddress address;
- private boolean done;
- private RegistrationRequest registrationRequest;
- private RuntimeException exception;
-
- private CancellationRequest( SocketAddress address )
- {
- this.address = address;
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
deleted file mode 100644
index 7344f70078..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoConnectorConfig;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.common.support.DefaultConnectFuture;
-import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.NewThreadExecutor;
-import org.apache.mina.util.Queue;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * {@link IoConnector} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-public class MultiThreadSocketConnector extends SocketConnector
-{
- /** @noinspection StaticNonFinalField */
- private static volatile int nextId = 0;
-
- private final Object lock = new Object();
- private final int id = nextId++;
- private final String threadName = "SocketConnector-" + id;
-
- private final Queue connectQueue = new Queue();
- private final MultiThreadSocketIoProcessor[] ioProcessors;
- private final int processorCount;
- private final Executor executor;
-
- /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
- private Selector selector;
- private Worker worker;
- private int processorDistributor = 0;
- private int workerTimeout = 60; // 1 min.
-
- /** Create a connector with a single processing thread using a NewThreadExecutor */
- public MultiThreadSocketConnector()
- {
- this(1, new NewThreadExecutor());
- }
-
- /**
- * Create a connector with the desired number of processing threads
- *
- * @param processorCount Number of processing threads
- * @param executor Executor to use for launching threads
- */
- public MultiThreadSocketConnector(int processorCount, Executor executor)
- {
- if (processorCount < 1)
- {
- throw new IllegalArgumentException("Must have at least one processor");
- }
-
- this.executor = executor;
- this.processorCount = processorCount;
- ioProcessors = new MultiThreadSocketIoProcessor[processorCount];
-
- for (int i = 0; i < processorCount; i++)
- {
- ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
- }
- }
-
- /**
- * How many seconds to keep the connection thread alive between connection requests
- *
- * @return Number of seconds to keep connection thread alive
- */
- public int getWorkerTimeout()
- {
- return workerTimeout;
- }
-
- /**
- * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
- *
- * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
- */
- public void setWorkerTimeout(int workerTimeout)
- {
- if (workerTimeout < 0)
- {
- throw new IllegalArgumentException("Must be >= 0");
- }
- this.workerTimeout = workerTimeout;
- }
-
- public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
- {
- return connect(address, null, handler, config);
- }
-
- public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
- IoHandler handler, IoServiceConfig config)
- {
- if (address == null)
- {
- throw new NullPointerException("address");
- }
- if (handler == null)
- {
- throw new NullPointerException("handler");
- }
-
- if (!(address instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected address type: "
- + address.getClass());
- }
-
- if (localAddress != null && !(localAddress instanceof InetSocketAddress))
- {
- throw new IllegalArgumentException("Unexpected local address type: "
- + localAddress.getClass());
- }
-
- if (config == null)
- {
- config = getDefaultConfig();
- }
-
- SocketChannel ch = null;
- boolean success = false;
- try
- {
- ch = SocketChannel.open();
- ch.socket().setReuseAddress(true);
- if (localAddress != null)
- {
- ch.socket().bind(localAddress);
- }
-
- ch.configureBlocking(false);
-
- if (ch.connect(address))
- {
- DefaultConnectFuture future = new DefaultConnectFuture();
- newSession(ch, handler, config, future);
- success = true;
- return future;
- }
-
- success = true;
- }
- catch (IOException e)
- {
- return DefaultConnectFuture.newFailedFuture(e);
- }
- finally
- {
- if (!success && ch != null)
- {
- try
- {
- ch.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
-
- ConnectionRequest request = new ConnectionRequest(ch, handler, config);
- synchronized (lock)
- {
- try
- {
- startupWorker();
- }
- catch (IOException e)
- {
- try
- {
- ch.close();
- }
- catch (IOException e2)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e2);
- }
-
- return DefaultConnectFuture.newFailedFuture(e);
- }
- }
-
- synchronized (connectQueue)
- {
- connectQueue.push(request);
- }
- selector.wakeup();
-
- return request;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if (worker == null)
- {
- selector = Selector.open();
- worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker));
- }
- }
-
- private void registerNew()
- {
- if (connectQueue.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- ConnectionRequest req;
- synchronized (connectQueue)
- {
- req = (ConnectionRequest) connectQueue.pop();
- }
-
- if (req == null)
- {
- break;
- }
-
- SocketChannel ch = req.channel;
- try
- {
- ch.register(selector, SelectionKey.OP_CONNECT, req);
- }
- catch (IOException e)
- {
- req.setException(e);
- }
- }
- }
-
- private void processSessions(Set keys)
- {
- Iterator it = keys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
-
- if (!key.isConnectable())
- {
- continue;
- }
-
- SocketChannel ch = (SocketChannel) key.channel();
- ConnectionRequest entry = (ConnectionRequest) key.attachment();
-
- boolean success = false;
- try
- {
- ch.finishConnect();
- newSession(ch, entry.handler, entry.config, entry);
- success = true;
- }
- catch (Throwable e)
- {
- entry.setException(e);
- }
- finally
- {
- key.cancel();
- if (!success)
- {
- try
- {
- ch.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
- }
-
- keys.clear();
- }
-
- private void processTimedOutSessions(Set keys)
- {
- long currentTime = System.currentTimeMillis();
- Iterator it = keys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
-
- if (!key.isValid())
- {
- continue;
- }
-
- ConnectionRequest entry = (ConnectionRequest) key.attachment();
-
- if (currentTime >= entry.deadline)
- {
- entry.setException(new ConnectException());
- try
- {
- key.channel().close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- key.cancel();
- }
- }
- }
- }
-
- private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
- throws IOException
- {
- MultiThreadSocketSessionImpl session =
- new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(),
- config, ch, handler, ch.socket().getRemoteSocketAddress());
-
- //new interface
-// SocketSessionImpl session = new SocketSessionImpl(
-// this, nextProcessor(), getListeners(),
-// config, ch, handler, ch.socket().getRemoteSocketAddress() );
- try
- {
- getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- config.getThreadModel().buildFilterChain(session.getFilterChain());
- }
- catch (Throwable e)
- {
- throw (IOException) new IOException("Failed to create a session.").initCause(e);
- }
-
- // Set the ConnectFuture of the specified session, which will be
- // removed and notified by AbstractIoFilterChain eventually.
- session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture );
-
- // Forward the remaining process to the SocketIoProcessor.
- session.getIoProcessor().addNew(session);
- }
-
- private MultiThreadSocketIoProcessor nextProcessor()
- {
- return ioProcessors[processorDistributor++ % processorCount];
- }
-
- private class Worker implements Runnable
- {
- private long lastActive = System.currentTimeMillis();
-
- public void run()
- {
- Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName);
-
- for (; ;)
- {
- try
- {
- int nKeys = selector.select(1000);
-
- registerNew();
-
- if (nKeys > 0)
- {
- processSessions(selector.selectedKeys());
- }
-
- processTimedOutSessions(selector.keys());
-
- if (selector.keys().isEmpty())
- {
- if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
- {
- synchronized (lock)
- {
- if (selector.keys().isEmpty() &&
- connectQueue.isEmpty())
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- else
- {
- lastActive = System.currentTimeMillis();
- }
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
- }
- }
-
- private class ConnectionRequest extends DefaultConnectFuture
- {
- private final SocketChannel channel;
- private final long deadline;
- private final IoHandler handler;
- private final IoServiceConfig config;
-
- private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
- {
- this.channel = channel;
- long timeout;
- if (config instanceof IoConnectorConfig)
- {
- timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
- }
- else
- {
- timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
- }
- this.deadline = System.currentTimeMillis() + timeout;
- this.handler = handler;
- this.config = config;
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
deleted file mode 100644
index 67b8c8d820..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import java.io.IOException;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
-
-/**
- * An {@link IoFilterChain} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- */
-class MultiThreadSocketFilterChain extends AbstractIoFilterChain {
-
- MultiThreadSocketFilterChain( IoSession parent )
- {
- super( parent );
- }
-
- protected void doWrite( IoSession session, WriteRequest writeRequest )
- {
- MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
- Queue writeRequestQueue = s.getWriteRequestQueue();
-
- // SocketIoProcessor.doFlush() will reset it after write is finished
- // because the buffer will be passed with messageSent event.
- ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.push( writeRequest );
- if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
- {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
- s.getIoProcessor().flush( s );
- }
- }
- }
-
- protected void doClose( IoSession session ) throws IOException
- {
- MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session;
- s.getIoProcessor().remove( s );
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
deleted file mode 100644
index c23ad8686f..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
+++ /dev/null
@@ -1,1026 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.util.IdentityHashSet;
-import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.Queue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $,
- */
-class MultiThreadSocketIoProcessor extends SocketIoProcessor
-{
- Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class);
- Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader");
- Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer");
-
- private static final long SELECTOR_TIMEOUT = 1000L;
-
- private int MAX_READ_BYTES_PER_SESSION = 524288; //512K
- private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K
-
- private final Object readLock = new Object();
- private final Object writeLock = new Object();
-
- private final String threadName;
- private final Executor executor;
-
- private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
-
- /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
- private volatile Selector selector, writeSelector;
-
- private final Queue newSessions = new Queue();
- private final Queue removingSessions = new Queue();
- private final BlockingQueue flushingSessions = new LinkedBlockingQueue();
- private final IdentityHashSet flushingSessionsSet = new IdentityHashSet();
-
- private final Queue trafficControllingSessions = new Queue();
-
- private ReadWorker readWorker;
- private WriteWorker writeWorker;
- private long lastIdleReadCheckTime = System.currentTimeMillis();
- private long lastIdleWriteCheckTime = System.currentTimeMillis();
-
- MultiThreadSocketIoProcessor(String threadName, Executor executor)
- {
- super(threadName, executor);
- this.threadName = threadName;
- this.executor = executor;
- }
-
- void addNew(SocketSessionImpl session) throws IOException
- {
- synchronized (newSessions)
- {
- newSessions.push(session);
- }
-
- startupWorker();
-
- selector.wakeup();
- writeSelector.wakeup();
- }
-
- void remove(SocketSessionImpl session) throws IOException
- {
- scheduleRemove(session);
- startupWorker();
- selector.wakeup();
- }
-
- private void startupWorker() throws IOException
- {
- synchronized (readLock)
- {
- if (readWorker == null)
- {
- selector = Selector.open();
- readWorker = new ReadWorker();
- executor.execute(new NamePreservingRunnable(readWorker));
- }
- }
-
- synchronized (writeLock)
- {
- if (writeWorker == null)
- {
- writeSelector = Selector.open();
- writeWorker = new WriteWorker();
- executor.execute(new NamePreservingRunnable(writeWorker));
- }
- }
-
- }
-
- void flush(SocketSessionImpl session)
- {
- scheduleFlush(session);
- Selector selector = this.writeSelector;
-
- if (selector != null)
- {
- selector.wakeup();
- }
- }
-
- void updateTrafficMask(SocketSessionImpl session)
- {
- scheduleTrafficControl(session);
- Selector selector = this.selector;
- if (selector != null)
- {
- selector.wakeup();
- }
- }
-
- private void scheduleRemove(SocketSessionImpl session)
- {
- synchronized (removingSessions)
- {
- removingSessions.push(session);
- }
- }
-
- private void scheduleFlush(SocketSessionImpl session)
- {
- synchronized (flushingSessionsSet)
- {
- //if flushingSessions grows to contain Integer.MAX_VALUE sessions
- // then this will fail.
- if (flushingSessionsSet.add(session))
- {
- flushingSessions.offer(session);
- }
- }
- }
-
- private void scheduleTrafficControl(SocketSessionImpl session)
- {
- synchronized (trafficControllingSessions)
- {
- trafficControllingSessions.push(session);
- }
- }
-
- private void doAddNewReader() throws InterruptedException
- {
- if (newSessions.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- MultiThreadSocketSessionImpl session;
-
- synchronized (newSessions)
- {
- session = (MultiThreadSocketSessionImpl) newSessions.peek();
- }
-
- if (session == null)
- {
- break;
- }
-
- SocketChannel ch = session.getChannel();
-
-
- try
- {
-
- ch.configureBlocking(false);
- session.setSelectionKey(ch.register(selector,
- SelectionKey.OP_READ,
- session));
-
- //System.out.println("ReadDebug:"+"Awaiting Registration");
- session.awaitRegistration();
- sessionCreated(session);
- }
- catch (IOException e)
- {
- // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
- // and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- }
- }
-
-
- private void doAddNewWrite() throws InterruptedException
- {
- if (newSessions.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- MultiThreadSocketSessionImpl session;
-
- synchronized (newSessions)
- {
- session = (MultiThreadSocketSessionImpl) newSessions.peek();
- }
-
- if (session == null)
- {
- break;
- }
-
- SocketChannel ch = session.getChannel();
-
- try
- {
- ch.configureBlocking(false);
- synchronized (flushingSessionsSet)
- {
- flushingSessionsSet.add(session);
- }
-
- session.setWriteSelectionKey(ch.register(writeSelector,
- SelectionKey.OP_WRITE,
- session));
-
- //System.out.println("WriteDebug:"+"Awaiting Registration");
- session.awaitRegistration();
- sessionCreated(session);
- }
- catch (IOException e)
- {
-
- // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
- // and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- }
- }
-
-
- private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
- {
- MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
- synchronized (newSessions)
- {
- if (!session.created())
- {
- _logger.debug("Popping new session");
- newSessions.pop();
-
- // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
- // in AbstractIoFilterChain.fireSessionOpened().
- session.getServiceListeners().fireSessionCreated(session);
-
- session.doneCreation();
- }
- }
- }
-
- private void doRemove()
- {
- if (removingSessions.isEmpty())
- {
- return;
- }
-
- for (; ;)
- {
- MultiThreadSocketSessionImpl session;
-
- synchronized (removingSessions)
- {
- session = (MultiThreadSocketSessionImpl) removingSessions.pop();
- }
-
- if (session == null)
- {
- break;
- }
-
- SocketChannel ch = session.getChannel();
- SelectionKey key = session.getReadSelectionKey();
- SelectionKey writeKey = session.getWriteSelectionKey();
-
- // Retry later if session is not yet fully initialized.
- // (In case that Session.close() is called before addSession() is processed)
- if (key == null || writeKey == null)
- {
- scheduleRemove(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid() || !writeKey.isValid())
- {
- continue;
- }
-
- try
- {
- //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
- synchronized (readLock)
- {
- key.cancel();
- }
- synchronized (writeLock)
- {
- writeKey.cancel();
- }
- ch.close();
- }
- catch (IOException e)
- {
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- finally
- {
- releaseWriteBuffers(session);
- session.getServiceListeners().fireSessionDestroyed(session);
- }
- }
- }
-
- private void processRead(Set selectedKeys)
- {
- Iterator it = selectedKeys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
- MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
-
- synchronized (readLock)
- {
- if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
- {
- read(session);
- }
- }
-
- }
-
- selectedKeys.clear();
- }
-
- private void processWrite(Set selectedKeys)
- {
- Iterator it = selectedKeys.iterator();
-
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
- SocketSessionImpl session = (SocketSessionImpl) key.attachment();
-
- synchronized (writeLock)
- {
- if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
- {
-
- // Clear OP_WRITE
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- synchronized (flushingSessionsSet)
- {
- flushingSessions.offer(session);
- }
- }
- }
- }
-
- selectedKeys.clear();
- }
-
- private void read(SocketSessionImpl session)
- {
-
- //if (_loggerWrite.isDebugEnabled())
- {
- //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session));
- }
-
- int totalReadBytes = 0;
-
- while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
- {
- ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
- SocketChannel ch = session.getChannel();
-
- try
- {
- buf.clear();
-
- int readBytes = 0;
- int ret;
-
- try
- {
- while ((ret = ch.read(buf.buf())) > 0)
- {
- readBytes += ret;
- totalReadBytes += ret;
- }
- }
- finally
- {
- buf.flip();
- }
-
-
- if (readBytes > 0)
- {
- session.increaseReadBytes(readBytes);
-
- session.getFilterChain().fireMessageReceived(session, buf);
- buf = null;
- }
-
- if (ret <= 0)
- {
- if (ret == 0)
- {
- if (readBytes == session.getReadBufferSize())
- {
- continue;
- }
- }
- else
- {
- scheduleRemove(session);
- }
-
- break;
- }
- }
- catch (Throwable e)
- {
- if (e instanceof IOException)
- {
- scheduleRemove(session);
- }
- session.getFilterChain().fireExceptionCaught(session, e);
-
- //Stop Reading this session.
- return;
- }
- finally
- {
- if (buf != null)
- {
- buf.release();
- }
- }
- }//for
-
- // if (_loggerWrite.isDebugEnabled())
- {
- //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes);
- }
- }
-
-
- private void notifyReadIdleness()
- {
- // process idle sessions
- long currentTime = System.currentTimeMillis();
- if ((currentTime - lastIdleReadCheckTime) >= 1000)
- {
- lastIdleReadCheckTime = currentTime;
- Set keys = selector.keys();
- if (keys != null)
- {
- for (Iterator it = keys.iterator(); it.hasNext();)
- {
- SelectionKey key = (SelectionKey) it.next();
- SocketSessionImpl session = (SocketSessionImpl) key.attachment();
- notifyReadIdleness(session, currentTime);
- }
- }
- }
- }
-
- private void notifyWriteIdleness()
- {
- // process idle sessions
- long currentTime = System.currentTimeMillis();
- if ((currentTime - lastIdleWriteCheckTime) >= 1000)
- {
- lastIdleWriteCheckTime = currentTime;
- Set keys = writeSelector.keys();
- if (keys != null)
- {
- for (Iterator it = keys.iterator(); it.hasNext();)
- {
- SelectionKey key = (SelectionKey) it.next();
- SocketSessionImpl session = (SocketSessionImpl) key.attachment();
- notifyWriteIdleness(session, currentTime);
- }
- }
- }
- }
-
- private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
- {
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
- IdleStatus.BOTH_IDLE,
- Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
- IdleStatus.READER_IDLE,
- Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
-
- notifyWriteTimeout(session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime());
- }
-
- private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
- {
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
- IdleStatus.BOTH_IDLE,
- Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
- IdleStatus.WRITER_IDLE,
- Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
-
- notifyWriteTimeout(session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime());
- }
-
- private void notifyIdleness0(SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status,
- long lastIoTime)
- {
- if (idleTime > 0 && lastIoTime != 0
- && (currentTime - lastIoTime) >= idleTime)
- {
- session.increaseIdleCount(status);
- session.getFilterChain().fireSessionIdle(session, status);
- }
- }
-
- private void notifyWriteTimeout(SocketSessionImpl session,
- long currentTime,
- long writeTimeout, long lastIoTime)
- {
-
- MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
- SelectionKey key = sesh.getWriteSelectionKey();
-
- synchronized (writeLock)
- {
- if (writeTimeout > 0
- && (currentTime - lastIoTime) >= writeTimeout
- && key != null && key.isValid()
- && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
- {
- session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
- }
- }
- }
-
- private SocketSessionImpl getNextFlushingSession()
- {
- return (SocketSessionImpl) flushingSessions.poll();
- }
-
- private void releaseSession(SocketSessionImpl session)
- {
- synchronized (session.getWriteRequestQueue())
- {
- synchronized (flushingSessionsSet)
- {
- if (session.getScheduledWriteRequests() > 0)
- {
- if (_loggerWrite.isDebugEnabled())
- {
- //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session));
- }
- flushingSessions.offer(session);
- }
- else
- {
- if (_loggerWrite.isDebugEnabled())
- {
- //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session));
- }
- flushingSessionsSet.remove(session);
- }
- }
- }
- }
-
- private void releaseWriteBuffers(SocketSessionImpl session)
- {
- Queue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
-
- //Should this be synchronized?
- synchronized (writeRequestQueue)
- {
- while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
- {
- try
- {
- ((ByteBuffer) req.getMessage()).release();
- }
- catch (IllegalStateException e)
- {
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- finally
- {
- req.getFuture().setWritten(false);
- }
- }
- }
- }
-
- private void doFlush()
- {
- MultiThreadSocketSessionImpl session;
-
- while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
- {
- if (!session.isConnected())
- {
- releaseWriteBuffers(session);
- releaseSession(session);
- continue;
- }
-
- SelectionKey key = session.getWriteSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.write() is called before addSession() is processed)
- if (key == null)
- {
- scheduleFlush(session);
- releaseSession(session);
- continue;
- }
- // skip if channel is already closed
- if (!key.isValid())
- {
- releaseSession(session);
- continue;
- }
-
- try
- {
- if (doFlush(session))
- {
- releaseSession(session);
- }
- }
- catch (IOException e)
- {
- releaseSession(session);
- scheduleRemove(session);
- session.getFilterChain().fireExceptionCaught(session, e);
- }
-
- }
-
- }
-
- private boolean doFlush(SocketSessionImpl sessionParam) throws IOException
- {
- MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
- // Clear OP_WRITE
- SelectionKey key = session.getWriteSelectionKey();
- synchronized (writeLock)
- {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- }
- SocketChannel ch = session.getChannel();
- Queue writeRequestQueue = session.getWriteRequestQueue();
-
- long totalFlushedBytes = 0;
- while (true)
- {
- WriteRequest req;
-
- synchronized (writeRequestQueue)
- {
- req = (WriteRequest) writeRequestQueue.first();
- }
-
- if (req == null)
- {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0)
- {
- synchronized (writeRequestQueue)
- {
- writeRequestQueue.pop();
- }
-
- session.increaseWrittenMessages();
-
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
-
- int writtenBytes = 0;
-
- // Reported as DIRMINA-362
- //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
- if (key.isWritable())
- {
- writtenBytes = ch.write(buf.buf());
- totalFlushedBytes += writtenBytes;
- }
-
- if (writtenBytes > 0)
- {
- session.increaseWrittenBytes(writtenBytes);
- }
-
- if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
- {
- // Kernel buffer is full
- synchronized (writeLock)
- {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- }
- if (_loggerWrite.isDebugEnabled())
- {
- //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
- }
- return false;
- }
- }
-
- if (_loggerWrite.isDebugEnabled())
- {
- //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes");
- }
- return true;
- }
-
- private void doUpdateTrafficMask()
- {
- if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked())
- {
- return;
- }
-
- // Synchronize over entire operation as this method should be called
- // from both read and write thread and we don't want the order of the
- // updates to get changed.
- trafficMaskUpdateLock.lock();
- try
- {
- for (; ;)
- {
- MultiThreadSocketSessionImpl session;
-
- session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop();
-
- if (session == null)
- {
- break;
- }
-
- SelectionKey key = session.getReadSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- if (key == null)
- {
- scheduleTrafficControl(session);
- break;
- }
- // skip if channel is already closed
- if (!key.isValid())
- {
- continue;
- }
-
- // The normal is OP_READ and, if there are write requests in the
- // session's write queue, set OP_WRITE to trigger flushing.
-
- //Sset to Read and Write if there is nothing then the cost
- // is one loop through the flusher.
- int ops = SelectionKey.OP_READ;
-
- // Now mask the preferred ops with the mask of the current session
- int mask = session.getTrafficMask().getInterestOps();
- synchronized (readLock)
- {
- key.interestOps(ops & mask);
- }
- //Change key to the WriteSelection Key
- key = session.getWriteSelectionKey();
- if (key != null && key.isValid())
- {
- Queue writeRequestQueue = session.getWriteRequestQueue();
- synchronized (writeRequestQueue)
- {
- if (!writeRequestQueue.isEmpty())
- {
- ops = SelectionKey.OP_WRITE;
- synchronized (writeLock)
- {
- key.interestOps(ops & mask);
- }
- }
- }
- }
- }
- }
- finally
- {
- trafficMaskUpdateLock.unlock();
- }
-
- }
-
- private class WriteWorker implements Runnable
- {
-
- public void run()
- {
- Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer");
-
- //System.out.println("WriteDebug:"+"Startup");
- for (; ;)
- {
- try
- {
- int nKeys = writeSelector.select(SELECTOR_TIMEOUT);
-
- doAddNewWrite();
- doUpdateTrafficMask();
-
- if (nKeys > 0)
- {
- //System.out.println("WriteDebug:"+nKeys + " keys from writeselector");
- processWrite(writeSelector.selectedKeys());
- }
- else
- {
- //System.out.println("WriteDebug:"+"No keys from writeselector");
- }
-
- doRemove();
- notifyWriteIdleness();
-
- if (flushingSessionsSet.size() > 0)
- {
- doFlush();
- }
-
- if (writeSelector.keys().isEmpty())
- {
- synchronized (writeLock)
- {
-
- if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
- {
- writeWorker = null;
- try
- {
- writeSelector.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- writeSelector = null;
- }
-
- break;
- }
- }
- }
-
- }
- catch (Throwable t)
- {
- ExceptionMonitor.getInstance().exceptionCaught(t);
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
- //System.out.println("WriteDebug:"+"Shutdown");
- }
-
- }
-
- private class ReadWorker implements Runnable
- {
-
- public void run()
- {
- Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
-
- //System.out.println("ReadDebug:"+"Startup");
- for (; ;)
- {
- try
- {
- int nKeys = selector.select(SELECTOR_TIMEOUT);
-
- doAddNewReader();
- doUpdateTrafficMask();
-
- if (nKeys > 0)
- {
- //System.out.println("ReadDebug:"+nKeys + " keys from selector");
-
- processRead(selector.selectedKeys());
- }
- else
- {
- //System.out.println("ReadDebug:"+"No keys from selector");
- }
-
-
- doRemove();
- notifyReadIdleness();
-
- if (selector.keys().isEmpty())
- {
-
- synchronized (readLock)
- {
- if (selector.keys().isEmpty() && newSessions.isEmpty())
- {
- readWorker = null;
- try
- {
- selector.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- finally
- {
- selector = null;
- }
-
- break;
- }
- }
- }
- }
- catch (Throwable t)
- {
- ExceptionMonitor.getInstance().exceptionCaught(t);
-
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e1)
- {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
- //System.out.println("ReadDebug:"+"Shutdown");
- }
-
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
deleted file mode 100644
index 043d4800b6..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoConnectorConfig;
-import org.apache.mina.common.support.BaseIoSessionConfig;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketException;
-
-/**
- * An {@link IoConnectorConfig} for {@link SocketConnector}.
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl
-{
- private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
- private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false;
- private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false;
- private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false;
-
- private static boolean DEFAULT_REUSE_ADDRESS;
- private static int DEFAULT_RECEIVE_BUFFER_SIZE;
- private static int DEFAULT_SEND_BUFFER_SIZE;
- private static int DEFAULT_TRAFFIC_CLASS;
- private static boolean DEFAULT_KEEP_ALIVE;
- private static boolean DEFAULT_OOB_INLINE;
- private static int DEFAULT_SO_LINGER;
- private static boolean DEFAULT_TCP_NO_DELAY;
-
- static
- {
- initialize();
- }
-
- private static void initialize()
- {
- Socket socket = null;
-
- socket = new Socket();
-
- try
- {
- DEFAULT_REUSE_ADDRESS = socket.getReuseAddress();
- DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize();
- DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize();
- DEFAULT_KEEP_ALIVE = socket.getKeepAlive();
- DEFAULT_OOB_INLINE = socket.getOOBInline();
- DEFAULT_SO_LINGER = socket.getSoLinger();
- DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay();
-
- // Check if setReceiveBufferSize is supported.
- try
- {
- socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE);
- SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true;
- }
- catch( SocketException e )
- {
- SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false;
- }
-
- // Check if setSendBufferSize is supported.
- try
- {
- socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE);
- SET_SEND_BUFFER_SIZE_AVAILABLE = true;
- }
- catch( SocketException e )
- {
- SET_SEND_BUFFER_SIZE_AVAILABLE = false;
- }
-
- // Check if getTrafficClass is supported.
- try
- {
- DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass();
- GET_TRAFFIC_CLASS_AVAILABLE = true;
- }
- catch( SocketException e )
- {
- GET_TRAFFIC_CLASS_AVAILABLE = false;
- DEFAULT_TRAFFIC_CLASS = 0;
- }
- }
- catch( SocketException e )
- {
- throw new ExceptionInInitializerError(e);
- }
- finally
- {
- if( socket != null )
- {
- try
- {
- socket.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- }
- }
- }
- }
-
- public static boolean isSetReceiveBufferSizeAvailable() {
- return SET_RECEIVE_BUFFER_SIZE_AVAILABLE;
- }
-
- public static boolean isSetSendBufferSizeAvailable() {
- return SET_SEND_BUFFER_SIZE_AVAILABLE;
- }
-
- public static boolean isGetTrafficClassAvailable() {
- return GET_TRAFFIC_CLASS_AVAILABLE;
- }
-
- public static boolean isSetTrafficClassAvailable() {
- return SET_TRAFFIC_CLASS_AVAILABLE;
- }
-
- private boolean reuseAddress = DEFAULT_REUSE_ADDRESS;
- private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
- private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
- private int trafficClass = DEFAULT_TRAFFIC_CLASS;
- private boolean keepAlive = DEFAULT_KEEP_ALIVE;
- private boolean oobInline = DEFAULT_OOB_INLINE;
- private int soLinger = DEFAULT_SO_LINGER;
- private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
-
- /**
- * Creates a new instance.
- */
- MultiThreadSocketSessionConfigImpl()
- {
- }
-
- public boolean isReuseAddress()
- {
- return reuseAddress;
- }
-
- public void setReuseAddress( boolean reuseAddress )
- {
- this.reuseAddress = reuseAddress;
- }
-
- public int getReceiveBufferSize()
- {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize( int receiveBufferSize )
- {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public int getSendBufferSize()
- {
- return sendBufferSize;
- }
-
- public void setSendBufferSize( int sendBufferSize )
- {
- this.sendBufferSize = sendBufferSize;
- }
-
- public int getTrafficClass()
- {
- return trafficClass;
- }
-
- public void setTrafficClass( int trafficClass )
- {
- this.trafficClass = trafficClass;
- }
-
- public boolean isKeepAlive()
- {
- return keepAlive;
- }
-
- public void setKeepAlive( boolean keepAlive )
- {
- this.keepAlive = keepAlive;
- }
-
- public boolean isOobInline()
- {
- return oobInline;
- }
-
- public void setOobInline( boolean oobInline )
- {
- this.oobInline = oobInline;
- }
-
- public int getSoLinger()
- {
- return soLinger;
- }
-
- public void setSoLinger( int soLinger )
- {
- this.soLinger = soLinger;
- }
-
- public boolean isTcpNoDelay()
- {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay( boolean tcpNoDelay )
- {
- this.tcpNoDelay = tcpNoDelay;
- }
-
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
deleted file mode 100644
index be4a2d289d..0000000000
--- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.socket.nio;
-
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.common.TransportType;
-import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.common.support.IoServiceListenerSupport;
-import org.apache.mina.util.Queue;
-
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * An {@link IoSession} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $
- */
-class MultiThreadSocketSessionImpl extends SocketSessionImpl
-{
- private final IoService manager;
- private final IoServiceConfig serviceConfig;
- private final SocketSessionConfig config = new SessionConfigImpl();
- private final MultiThreadSocketIoProcessor ioProcessor;
- private final MultiThreadSocketFilterChain filterChain;
- private final SocketChannel ch;
- private final Queue writeRequestQueue;
- private final IoHandler handler;
- private final SocketAddress remoteAddress;
- private final SocketAddress localAddress;
- private final SocketAddress serviceAddress;
- private final IoServiceListenerSupport serviceListeners;
- private SelectionKey readKey, writeKey;
- private int readBufferSize;
- private CountDownLatch registeredReadyLatch = new CountDownLatch(2);
- private AtomicBoolean created = new AtomicBoolean(false);
-
- /**
- * Creates a new instance.
- */
- MultiThreadSocketSessionImpl( IoService manager,
- SocketIoProcessor ioProcessor,
- IoServiceListenerSupport listeners,
- IoServiceConfig serviceConfig,
- SocketChannel ch,
- IoHandler defaultHandler,
- SocketAddress serviceAddress )
- {
- super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress);
- this.manager = manager;
- this.serviceListeners = listeners;
- this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor;
- this.filterChain = new MultiThreadSocketFilterChain(this);
- this.ch = ch;
- this.writeRequestQueue = new Queue();
- this.handler = defaultHandler;
- this.remoteAddress = ch.socket().getRemoteSocketAddress();
- this.localAddress = ch.socket().getLocalSocketAddress();
- this.serviceAddress = serviceAddress;
- this.serviceConfig = serviceConfig;
-
- // Apply the initial session settings
- IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
- if( sessionConfig instanceof SocketSessionConfig )
- {
- SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig;
- this.config.setKeepAlive( cfg.isKeepAlive() );
- this.config.setOobInline( cfg.isOobInline() );
- this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() );
- this.readBufferSize = cfg.getReceiveBufferSize();
- this.config.setReuseAddress( cfg.isReuseAddress() );
- this.config.setSendBufferSize( cfg.getSendBufferSize() );
- this.config.setSoLinger( cfg.getSoLinger() );
- this.config.setTcpNoDelay( cfg.isTcpNoDelay() );
-
- if( this.config.getTrafficClass() != cfg.getTrafficClass() )
- {
- this.config.setTrafficClass( cfg.getTrafficClass() );
- }
- }
- }
-
- void awaitRegistration() throws InterruptedException
- {
- registeredReadyLatch.countDown();
-
- registeredReadyLatch.await();
- }
-
- boolean created() throws InterruptedException
- {
- return created.get();
- }
-
- void doneCreation()
- {
- created.getAndSet(true);
- }
-
- public IoService getService()
- {
- return manager;
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return serviceConfig;
- }
-
- public IoSessionConfig getConfig()
- {
- return config;
- }
-
- SocketIoProcessor getIoProcessor()
- {
- return ioProcessor;
- }
-
- public IoFilterChain getFilterChain()
- {
- return filterChain;
- }
-
- SocketChannel getChannel()
- {
- return ch;
- }
-
- IoServiceListenerSupport getServiceListeners()
- {
- return serviceListeners;
- }
-
- SelectionKey getSelectionKey()
- {
- return readKey;
- }
-
- SelectionKey getReadSelectionKey()
- {
- return readKey;
- }
-
- SelectionKey getWriteSelectionKey()
- {
- return writeKey;
- }
-
- void setSelectionKey(SelectionKey key)
- {
- this.readKey = key;
- }
-
- void setWriteSelectionKey(SelectionKey key)
- {
- this.writeKey = key;
- }
-
- public IoHandler getHandler()
- {
- return handler;
- }
-
- protected void close0()
- {
- filterChain.fireFilterClose( this );
- }
-
- Queue getWriteRequestQueue()
- {
- return writeRequestQueue;
- }
-
- /**
- @return int Number of write scheduled write requests
- @deprecated
- */
- public int getScheduledWriteMessages()
- {
- return getScheduledWriteRequests();
- }
-
- public int getScheduledWriteRequests()
- {
- synchronized( writeRequestQueue )
- {
- return writeRequestQueue.size();
- }
- }
-
- public int getScheduledWriteBytes()
- {
- synchronized( writeRequestQueue )
- {
- return writeRequestQueue.byteSize();
- }
- }
-
- protected void write0( WriteRequest writeRequest )
- {
- filterChain.fireFilterWrite( this, writeRequest );
- }
-
- public TransportType getTransportType()
- {
- return TransportType.SOCKET;
- }
-
- public SocketAddress getRemoteAddress()
- {
- //This is what I had previously
-// return ch.socket().getRemoteSocketAddress();
- return remoteAddress;
- }
-
- public SocketAddress getLocalAddress()
- {
- //This is what I had previously
-// return ch.socket().getLocalSocketAddress();
- return localAddress;
- }
-
- public SocketAddress getServiceAddress()
- {
- return serviceAddress;
- }
-
- protected void updateTrafficMask()
- {
- this.ioProcessor.updateTrafficMask( this );
- }
-
- int getReadBufferSize()
- {
- return readBufferSize;
- }
-
- private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
- {
- public boolean isKeepAlive()
- {
- try
- {
- return ch.socket().getKeepAlive();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setKeepAlive( boolean on )
- {
- try
- {
- ch.socket().setKeepAlive( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public boolean isOobInline()
- {
- try
- {
- return ch.socket().getOOBInline();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setOobInline( boolean on )
- {
- try
- {
- ch.socket().setOOBInline( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public boolean isReuseAddress()
- {
- try
- {
- return ch.socket().getReuseAddress();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setReuseAddress( boolean on )
- {
- try
- {
- ch.socket().setReuseAddress( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public int getSoLinger()
- {
- try
- {
- return ch.socket().getSoLinger();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setSoLinger( int linger )
- {
- try
- {
- if( linger < 0 )
- {
- ch.socket().setSoLinger( false, 0 );
- }
- else
- {
- ch.socket().setSoLinger( true, linger );
- }
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public boolean isTcpNoDelay()
- {
- try
- {
- return ch.socket().getTcpNoDelay();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setTcpNoDelay( boolean on )
- {
- try
- {
- ch.socket().setTcpNoDelay( on );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public int getTrafficClass()
- {
- if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
- {
- try
- {
- return ch.socket().getTrafficClass();
- }
- catch( SocketException e )
- {
- // Throw an exception only when setTrafficClass is also available.
- if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
- {
- throw new RuntimeIOException( e );
- }
- }
- }
-
- return 0;
- }
-
- public void setTrafficClass( int tc )
- {
- if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
- {
- try
- {
- ch.socket().setTrafficClass( tc );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
- }
-
- public int getSendBufferSize()
- {
- try
- {
- return ch.socket().getSendBufferSize();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setSendBufferSize( int size )
- {
- if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
- {
- try
- {
- ch.socket().setSendBufferSize( size );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
- }
-
- public int getReceiveBufferSize()
- {
- try
- {
- return ch.socket().getReceiveBufferSize();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
-
- public void setReceiveBufferSize( int size )
- {
- if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
- {
- try
- {
- ch.socket().setReceiveBufferSize( size );
- MultiThreadSocketSessionImpl.this.readBufferSize = size;
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
deleted file mode 100644
index 5423bbb68f..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid;
-
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.mina.MinaHandler;
-
-import static org.apache.qpid.transport.util.Functions.str;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-
-/**
- * ToyBroker
- *
- * @author Rafael H. Schloming
- */
-
-class ToyBroker extends SessionDelegate
-{
-
- private ToyExchange exchange;
- private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
-
- public ToyBroker(ToyExchange exchange)
- {
- this.exchange = exchange;
- }
-
- public void messageAcquire(Session context, MessageAcquire struct)
- {
- System.out.println("\n==================> messageAcquire " );
- context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers()));
- }
-
- @Override public void queueDeclare(Session ssn, QueueDeclare qd)
- {
- exchange.createQueue(qd.getQueue());
- System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
- }
-
- @Override public void exchangeBind(Session ssn, ExchangeBind qb)
- {
- exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
- System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n");
- }
-
- @Override public void queueQuery(Session ssn, QueueQuery qq)
- {
- QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
- ssn.executionResult((int) qq.getId(), result);
- }
-
- @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
- {
- Consumer c = new Consumer();
- c._queueName = ms.getQueue();
- consumers.put(ms.getDestination(),c);
- System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
- }
-
- @Override public void messageFlow(Session ssn,MessageFlow struct)
- {
- Consumer c = consumers.get(struct.getDestination());
- c._credit = struct.getValue();
- System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
- }
-
- @Override public void messageFlush(Session ssn,MessageFlush struct)
- {
- System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
- checkAndSendMessagesToConsumer(ssn,struct.getDestination());
- }
-
- @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
- {
- String dest = xfr.getDestination();
- System.out.println("received transfer " + dest);
- Header header = xfr.getHeader();
- DeliveryProperties props = header.get(DeliveryProperties.class);
- if (props != null)
- {
- System.out.println("received headers routing_key " + props.getRoutingKey());
- }
-
- MessageProperties mp = header.get(MessageProperties.class);
- System.out.println("MP: " + mp);
- if (mp != null)
- {
- System.out.println(mp.getApplicationHeaders());
- }
-
- if (exchange.route(dest,props == null ? null : props.getRoutingKey(),xfr))
- {
- System.out.println("queued " + xfr);
- dispatchMessages(ssn);
- }
- else
- {
-
- if (props == null || !props.getDiscardUnroutable())
- {
- RangeSet ranges = new RangeSet();
- ranges.add(xfr.getId());
- ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
- "no such destination");
- }
- }
- ssn.processed(xfr);
- }
-
- private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
- {
- System.out.println("\n==================> Transfering message to: " +dest + "\n");
- ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED,
- m.getHeader(), m.getBody());
- }
-
- private void dispatchMessages(Session ssn)
- {
- for (String dest: consumers.keySet())
- {
- checkAndSendMessagesToConsumer(ssn,dest);
- }
- }
-
- private void checkAndSendMessagesToConsumer(Session ssn,String dest)
- {
- Consumer c = consumers.get(dest);
- LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
- MessageTransfer m = queue.poll();
- while (m != null && c._credit>0)
- {
- transferMessageToPeer(ssn,dest,m);
- c._credit--;
- m = queue.poll();
- }
- }
-
- // ugly, but who cares :)
- // assumes unit is always no of messages, not bytes
- // assumes it's credit mode and not window
- private static class Consumer
- {
- long _credit;
- String _queueName;
- }
-
- private static final class ToyBrokerSession extends Session
- {
-
- public ToyBrokerSession(Connection connection, Binary name, long expiry, ToyExchange exchange)
- {
- super(connection, new ToyBroker(exchange), name, expiry);
- }
- }
-
- public static final void main(String[] args) throws IOException
- {
- final ToyExchange exchange = new ToyExchange();
- ConnectionDelegate delegate = new ServerDelegate()
- {
- @Override
- public void init(Connection conn, ProtocolHeader hdr)
- {
- conn.setSessionFactory(new Connection.SessionFactory()
- {
- public Session newSession(Connection conn, Binary name, long expiry)
- {
- return new ToyBrokerSession(conn, name, expiry, exchange);
- }
- });
-
- super.init(conn, hdr); //To change body of overridden methods use File | Settings | File Templates.
- }
-
- };
-
- MinaHandler.accept("0.0.0.0", 5672, delegate);
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
deleted file mode 100644
index 5b2db10613..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid;
-
-import java.nio.*;
-import java.util.*;
-
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.mina.MinaHandler;
-
-
-/**
- * ToyClient
- *
- * @author Rafael H. Schloming
- */
-
-class ToyClient implements SessionListener
-{
- public void opened(Session ssn) {}
-
- public void resumed(Session ssn) {}
-
- public void exception(Session ssn, SessionException exc)
- {
- exc.printStackTrace();
- }
-
- public void message(Session ssn, MessageTransfer xfr)
- {
- System.out.println("msg: " + xfr);
- }
-
- public void closed(Session ssn) {}
-
- public static final void main(String[] args)
- {
- Connection conn = new Connection();
- conn.connect("0.0.0.0", 5672, null, "guest", "guest", false);
- Session ssn = conn.createSession();
- ssn.setSessionListener(new ToyClient());
-
- ssn.queueDeclare("asdf", null, null);
- ssn.sync();
-
- Map<String,Object> nested = new LinkedHashMap<String,Object>();
- nested.put("list", Arrays.asList("one", "two", "three"));
- Map<String,Object> map = new LinkedHashMap<String,Object>();
-
- map.put("str", "this is a string");
-
- map.put("+int", 3);
- map.put("-int", -3);
- map.put("maxint", Integer.MAX_VALUE);
- map.put("minint", Integer.MIN_VALUE);
-
- map.put("+short", (short) 1);
- map.put("-short", (short) -1);
- map.put("maxshort", (short) Short.MAX_VALUE);
- map.put("minshort", (short) Short.MIN_VALUE);
-
- map.put("float", (float) 3.3);
- map.put("double", 4.9);
- map.put("char", 'c');
-
- map.put("table", nested);
- map.put("list", Arrays.asList(1, 2, 3));
- map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
-
- ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(new DeliveryProperties(),
- new MessageProperties()
- .setApplicationHeaders(map)),
- "this is the data");
-
- ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED,
- null,
- "this should be rejected");
- ssn.sync();
-
- Future<QueueQueryResult> future = ssn.queueQuery("asdf");
- System.out.println(future.get().getQueue());
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
deleted file mode 100644
index da6aed9629..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.qpid;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.qpid.transport.MessageTransfer;
-
-
-public class ToyExchange
-{
- final static String DIRECT = "amq.direct";
- final static String TOPIC = "amq.topic";
-
- private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
- private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
- private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>();
-
- public void createQueue(String name)
- {
- queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
- }
-
- public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
- {
- return queues.get(name);
- }
-
- public void bindQueue(String type,String binding,String queueName)
- {
- LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
- binding = normalizeKey(binding);
- if(DIRECT.equals(type))
- {
-
- if (directEx.containsKey(binding))
- {
- List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
- list.add(queue);
- }
- else
- {
- List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
- list.add(queue);
- directEx.put(binding,list);
- }
- }
- else
- {
- if (topicEx.containsKey(binding))
- {
- List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
- list.add(queue);
- }
- else
- {
- List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
- list.add(queue);
- topicEx.put(binding,list);
- }
- }
- }
-
- public boolean route(String dest, String routingKey, MessageTransfer msg)
- {
- List<LinkedBlockingQueue<MessageTransfer>> queues;
- if(DIRECT.equals(dest))
- {
- queues = directEx.get(routingKey);
- }
- else
- {
- queues = matchWildCard(routingKey);
- }
- if(queues != null && queues.size()>0)
- {
- System.out.println("Message stored in " + queues.size() + " queues");
- storeMessage(msg,queues);
- return true;
- }
- else
- {
- System.out.println("Message unroutable " + msg);
- return false;
- }
- }
-
- private String normalizeKey(String routingKey)
- {
- if(routingKey.indexOf(".*")>1)
- {
- return routingKey.substring(0,routingKey.indexOf(".*"));
- }
- else
- {
- return routingKey;
- }
- }
-
- private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
- {
- List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
-
- for(String key: topicEx.keySet())
- {
- Pattern p = Pattern.compile(key);
- Matcher m = p.matcher(routingKey);
- if (m.find())
- {
- for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
- {
- selected.add(queue);
- }
- }
- }
-
- return selected;
- }
-
- private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
- {
- for(LinkedBlockingQueue<MessageTransfer> queue : selected)
- {
- queue.offer(msg);
- }
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 0dd21238a7..5fa14cf103 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -86,35 +86,6 @@ public class ClientProperties
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
- /**
- * ==========================================================
- * Those properties are used when the io size should be bounded
- * ==========================================================
- */
-
- /**
- * When set to true the io layer throttle down producers and consumers
- * when written or read messages are accumulating and exceeding a certain size.
- * This is especially useful when a the producer rate is greater than the network
- * speed.
- * type: boolean
- */
- public static final String PROTECTIO_PROP_NAME = "protectio";
-
- //=== The following properties are only used when the previous one is true.
- /**
- * Max size of read messages that can be stored within the MINA layer
- * type: int
- */
- public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
- /**
- * Max size of written messages that can be stored within the MINA layer
- * type: int
- */
- public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
-
public static final String AMQP_VERSION = "qpid.amqp.version";
private static ClientProperties _instance = new ClientProperties();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
deleted file mode 100644
index ecc5f6d07c..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package org.apache.qpid.transport.network.io;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQMethodBodyFactory;
-import org.apache.qpid.framing.BodyFactory;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentBodyFactory;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderBodyFactory;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.HeartbeatBodyFactory;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.transport.Receiver;
-
-public class InputHandler_0_9 implements Receiver<ByteBuffer>
-{
-
- private AMQVersionAwareProtocolSession _session;
- private MethodRegistry _registry;
- private BodyFactory bodyFactory;
- private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
-
- static
- {
- _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
- _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
- _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
- }
-
- public InputHandler_0_9(AMQVersionAwareProtocolSession session)
- {
- _session = session;
- _registry = _session.getMethodRegistry();
- }
-
- public void closed()
- {
- // AS FIXME: implement
- }
-
- public void exception(Throwable t)
- {
- // TODO: propogate exception to things
- t.printStackTrace();
- }
-
- public void received(ByteBuffer buf)
- {
- org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf);
- try
- {
- final byte type = in.get();
- if (type == AMQMethodBody.TYPE)
- {
- bodyFactory = new AMQMethodBodyFactory(_session);
- }
- else
- {
- bodyFactory = _bodiesSupported[type];
- }
-
- if (bodyFactory == null)
- {
- throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
- }
-
- final int channel = in.getUnsignedShort();
- final long bodySize = in.getUnsignedInt();
-
- // bodySize can be zero
- if ((channel < 0) || (bodySize < 0))
- {
- throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
- + " bodySize = " + bodySize, null);
- }
-
- AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
-
- byte marker = in.get();
- if ((marker & 0xFF) != 0xCE)
- {
- throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
- + " type=" + type, null);
- }
-
- try
- {
- frame.getBodyFrame().handle(frame.getChannel(), _session);
- }
- catch (AMQException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- catch (AMQFrameDecodingException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index bfdbb34978..f261111777 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -19,25 +19,16 @@
package org.apache.qpid.transport.network.io;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketException;
import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
import org.apache.qpid.transport.network.security.ssl.SSLSender;
import org.apache.qpid.transport.util.Logger;
@@ -134,82 +125,6 @@ public final class IoTransport<E> implements IoContext
return socket;
}
- public static final <E> E connect(String host, int port,
- Binding<E,ByteBuffer> binding,
- boolean ssl)
- {
- Socket socket = createSocket(host, port);
- IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl);
- return transport.endpoint;
- }
-
- public static final Connection connect(String host, int port,
- ConnectionDelegate delegate,
- boolean ssl)
- {
- return connect(host, port, ConnectionBinding.get(delegate),ssl);
- }
-
- public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port, boolean ssl)
- {
- connect(host, port, new Binding_0_9(session),ssl);
- }
-
- private static class Binding_0_9
- implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
- {
-
- private AMQVersionAwareProtocolSession session;
-
- Binding_0_9(AMQVersionAwareProtocolSession session)
- {
- this.session = session;
- }
-
- public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender)
- {
- session.setSender(sender);
- return session;
- }
-
- public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn)
- {
- return new InputHandler_0_9(ssn);
- }
-
- }
-
- private static Socket createSocket(String host, int port)
- {
- try
- {
- InetAddress address = InetAddress.getByName(host);
- Socket socket = new Socket();
- socket.setReuseAddress(true);
- socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
-
- log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
-
- socket.setSendBufferSize(writeBufferSize);
- socket.setReceiveBufferSize(readBufferSize);
-
- log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
-
- socket.connect(new InetSocketAddress(address, port));
- return socket;
- }
- catch (SocketException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
- }
-
private SSLContext createSSLContext() throws Exception
{
String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index 0f2c0d0226..2206e0999e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -26,16 +26,11 @@ import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
@@ -66,16 +61,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
ProtocolEngine _protocolEngine;
- private boolean _useNIO = false;
private int _processors = 4;
- private boolean _executorPool = false;
private SSLContextFactory _sslFactory = null;
private IoConnector _socketConnector;
private IoAcceptor _acceptor;
private IoSession _ioSession;
private ProtocolEngineFactory _factory;
- private boolean _protectIO;
- private NetworkDriverConfiguration _config;
private Throwable _lastException;
private boolean _acceptingConnections = false;
@@ -91,21 +82,9 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
- public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO)
+ public MINANetworkDriver(int processors, ProtocolEngine protocolEngine, IoSession session)
{
- _useNIO = useNIO;
_processors = processors;
- _executorPool = executorPool;
- _protectIO = protectIO;
- }
-
- public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO,
- ProtocolEngine protocolEngine, IoSession session)
- {
- _useNIO = useNIO;
- _processors = processors;
- _executorPool = executorPool;
- _protectIO = protectIO;
_protocolEngine = protocolEngine;
_ioSession = session;
_ioSession.setAttachment(_protocolEngine);
@@ -132,17 +111,8 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
_factory = factory;
- _config = config;
- if (_useNIO)
- {
- _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
- new NewThreadExecutor());
- }
- else
- {
- _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
- }
+ _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)"));
@@ -207,15 +177,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_sslFactory = sslFactory;
}
- if (_useNIO)
- {
- _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
- }
- else
- {
- _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking
- // connector
- }
+ _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
String s = "";
@@ -351,39 +313,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
// Configure the session with SSL if necessary
SessionUtil.initialize(protocolSession);
- if (_executorPool)
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
- }
- else
- {
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
- }
- // Do we want to have read/write buffer limits?
- if (_protectIO)
- {
- //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
-
- protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
- writefilter.attach(chain);
-
- protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
if (_ioSession == null)
@@ -395,7 +328,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
// Set up the protocol engine
ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
- MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
+ MINANetworkDriver newDriver = new MINANetworkDriver(_processors, protocolEngine, protocolSession);
protocolEngine.setNetworkDriver(newDriver);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
deleted file mode 100644
index b89eed48b0..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.mina;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-import org.apache.mina.common.*;
-
-import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.executor.ExecutorFilter;
-
-import org.apache.qpid.transport.Binding;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.ConnectionBinding;
-
-import org.apache.qpid.transport.util.Logger;
-
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-
-import static org.apache.qpid.transport.util.Functions.*;
-
-/**
- * MinaHandler
- *
- * @author Rafael H. Schloming
- */
-//RA making this public until we sort out the package issues
-public class MinaHandler<E> implements IoHandler
-{
- /** Default buffer size for pending messages reads */
- private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
- /** Default buffer size for pending messages writes */
- private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
- private static final int MAX_RCVBUF = 64*1024;
-
- private static final Logger log = Logger.get(MinaHandler.class);
-
- static
- {
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
- }
-
- private final Binding<E,java.nio.ByteBuffer> binding;
-
- private MinaHandler(Binding<E,java.nio.ByteBuffer> binding)
- {
- this.binding = binding;
- }
-
- public void messageReceived(IoSession ssn, Object obj)
- {
- Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
- ByteBuffer buf = (ByteBuffer) obj;
- try
- {
- attachment.receiver.received(buf.buf());
- }
- catch (Throwable t)
- {
- log.error(t, "exception handling buffer %s", str(buf.buf()));
- throw new RuntimeException(t);
- }
- }
-
- public void messageSent(IoSession ssn, Object obj)
- {
- // do nothing
- }
-
- public void exceptionCaught(IoSession ssn, Throwable e)
- {
- Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
- attachment.receiver.exception(e);
- }
-
- /**
- * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
- * session, which filters the events handled by this handler. The filter chain consists of, handing off events
- * to an optional protectio
- *
- * @param session The MINA session.
- * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
- */
- public void sessionCreated(IoSession session) throws Exception
- {
- log.debug("Protocol session created for session " + System.identityHashCode(session));
-
- if (Boolean.getBoolean("protectio"))
- {
- try
- {
- //Add IO Protection Filters
- IoFilterChain chain = session.getFilterChain();
-
- session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(
- Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(
- Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
- writefilter.attach(chain);
- session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
- log.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- }
-
- public void sessionOpened(final IoSession ssn)
- {
- log.debug("opened: %s", this);
- E endpoint = binding.endpoint(new MinaSender(ssn));
- Attachment<E> attachment =
- new Attachment<E>(endpoint, binding.receiver(endpoint));
-
- // We need to synchronize and notify here because the MINA
- // connect future returns the session prior to the attachment
- // being set. This is arguably a bug in MINA.
- synchronized (ssn)
- {
- ssn.setAttachment(attachment);
- ssn.notifyAll();
- }
- }
-
- public void sessionClosed(IoSession ssn)
- {
- log.debug("closed: %s", ssn);
- Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
- attachment.receiver.closed();
- ssn.setAttachment(null);
- }
-
- public void sessionIdle(IoSession ssn, IdleStatus status)
- {
- // do nothing
- }
-
- private static class Attachment<E>
- {
-
- E endpoint;
- Receiver<java.nio.ByteBuffer> receiver;
-
- Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver)
- {
- this.endpoint = endpoint;
- this.receiver = receiver;
- }
- }
-
- public static final void accept(String host, int port,
- Binding<?,java.nio.ByteBuffer> binding)
- throws IOException
- {
- accept(new InetSocketAddress(host, port), binding);
- }
-
- public static final <E> void accept(SocketAddress address,
- Binding<E,java.nio.ByteBuffer> binding)
- throws IOException
- {
- IoAcceptor acceptor = new SocketAcceptor();
- acceptor.bind(address, new MinaHandler<E>(binding));
- }
-
- public static final <E> E connect(String host, int port,
- Binding<E,java.nio.ByteBuffer> binding)
- {
- return connect(new InetSocketAddress(host, port), binding);
- }
-
- public static final <E> E connect(SocketAddress address,
- Binding<E,java.nio.ByteBuffer> binding)
- {
- MinaHandler<E> handler = new MinaHandler<E>(binding);
- SocketConnector connector = new SocketConnector();
- IoServiceConfig acceptorConfig = connector.getDefaultConfig();
- acceptorConfig.setThreadModel(ThreadModel.MANUAL);
- SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig();
- scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
- Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize");
- if (sendBufferSize != null && sendBufferSize > 0)
- {
- scfg.setSendBufferSize(sendBufferSize);
- }
- Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize");
- if (receiveBufferSize != null && receiveBufferSize > 0)
- {
- scfg.setReceiveBufferSize(receiveBufferSize);
- }
- else if (scfg.getReceiveBufferSize() > MAX_RCVBUF)
- {
- scfg.setReceiveBufferSize(MAX_RCVBUF);
- }
- connector.setWorkerTimeout(0);
- ConnectFuture cf = connector.connect(address, handler);
- cf.join();
- IoSession ssn = cf.getSession();
-
- // We need to synchronize and wait here because the MINA
- // connect future returns the session prior to the attachment
- // being set. This is arguably a bug in MINA.
- synchronized (ssn)
- {
- while (ssn.getAttachment() == null)
- {
- try
- {
- ssn.wait();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- Attachment<E> attachment = (Attachment<E>) ssn.getAttachment();
- return attachment.endpoint;
- }
-
- public static final void accept(String host, int port,
- ConnectionDelegate delegate)
- throws IOException
- {
- accept(host, port, ConnectionBinding.get(delegate));
- }
-
- public static final Connection connect(String host, int port,
- ConnectionDelegate delegate)
- {
- return connect(host, port, ConnectionBinding.get(delegate));
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
deleted file mode 100644
index 22b9c5e784..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.mina;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-
-
-/**
- * MinaSender
- */
-
-public class MinaSender implements Sender<java.nio.ByteBuffer>
-{
- private static final int TIMEOUT = 2 * 60 * 1000;
-
- private final IoSession session;
- private WriteFuture lastWrite = null;
-
- public MinaSender(IoSession session)
- {
- this.session = session;
- }
-
- public void send(java.nio.ByteBuffer buf)
- {
- if (session.isClosing())
- {
- throw new TransportException("attempted to write to a closed socket");
- }
-
- synchronized (this)
- {
- lastWrite = session.write(ByteBuffer.wrap(buf));
- }
- }
-
- public void flush()
- {
- // pass
- }
-
- public synchronized void close()
- {
- // MINA will sometimes throw away in-progress writes when you
- // ask it to close
- synchronized (this)
- {
- if (lastWrite != null)
- {
- lastWrite.join();
- }
- }
- CloseFuture closed = session.close();
- closed.join();
- }
-
- public void setIdleTimeout(int i)
- {
- //noop
- }
-
- public long getIdleTimeout()
- {
- return 0;
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
deleted file mode 100644
index 84e66c25bd..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package org.apache.qpid.transport.network.nio;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-
-public class NioHandler implements Runnable
-{
- private Receiver<ByteBuffer> _receiver;
- private SocketChannel _ch;
- private ByteBuffer _readBuf;
- private static Map<Long,NioSender> _handlers = new ConcurrentHashMap<Long,NioSender>();
-
- private NioHandler(){}
-
- public static final Connection connect(String host, int port,
- ConnectionDelegate delegate)
- {
- NioHandler handler = new NioHandler();
- return handler.connectInternal(host,port,delegate);
- }
-
- private Connection connectInternal(String host, int port,
- ConnectionDelegate delegate)
- {
- try
- {
- SocketAddress address = new InetSocketAddress(host,port);
- _ch = SocketChannel.open();
- _ch.socket().setReuseAddress(true);
- _ch.configureBlocking(true);
- _ch.socket().setTcpNoDelay(true);
- if (address != null)
- {
- _ch.socket().connect(address);
- }
- while (_ch.isConnectionPending())
- {
-
- }
-
- }
- catch (SocketException e)
- {
-
- e.printStackTrace();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
-
- NioSender sender = new NioSender(_ch);
- Connection con = new Connection();
- con.setSender(new Disassembler(sender, 64*1024 - 1));
- con.setConnectionDelegate(delegate);
-
- _handlers.put(con.getConnectionId(),sender);
-
- _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR);
-
- Thread t = new Thread(this);
- t.start();
-
- return con;
- }
-
- public void run()
- {
- _readBuf = ByteBuffer.allocate(512);
- long read = 0;
- while(_ch.isConnected() && _ch.isOpen())
- {
- try
- {
- read = _ch.read(_readBuf);
- if (read > 0)
- {
- _readBuf.flip();
- ByteBuffer b = ByteBuffer.allocate(_readBuf.remaining());
- b.put(_readBuf);
- b.flip();
- _readBuf.clear();
- _receiver.received(b);
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
-
- //throw new EOFException("The underlying socket/channel has closed");
- }
-
- public static void startBatchingFrames(int connectionId)
- {
- NioSender sender = _handlers.get(connectionId);
- sender.setStartBatching();
- }
-
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
deleted file mode 100644
index 2fa875f279..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package org.apache.qpid.transport.network.nio;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.qpid.transport.Sender;
-
-public class NioSender implements Sender<java.nio.ByteBuffer>
-{
- private final Object lock = new Object();
- private SocketChannel _ch;
- private boolean _batch = false;
- private ByteBuffer _batcher;
-
- public NioSender(SocketChannel ch)
- {
- this._ch = ch;
- }
-
- public void send(java.nio.ByteBuffer buf)
- {
- if (_batch)
- {
- //System.out.println(_batcher.position() + " , " + buf.remaining() + " , " + buf.position() + ","+_batcher.capacity());
- if (_batcher.position() + buf.remaining() >= _batcher.capacity())
- {
- _batcher.flip();
- write(_batcher);
- _batcher.clear();
- if (buf.remaining() > _batcher.capacity())
- {
- write(buf);
- }
- else
- {
- _batcher.put(buf);
- }
- }
- else
- {
- _batcher.put(buf);
- }
- }
- else
- {
- write(buf);
- }
- }
-
- public void flush()
- {
- // pass
- }
-
- private void write(java.nio.ByteBuffer buf)
- {
- synchronized (lock)
- {
- if( _ch.isConnected() && _ch.isOpen())
- {
- try
- {
- _ch.write(buf);
- }
- catch(Exception e)
- {
- e.fillInStackTrace();
- }
- }
- else
- {
- throw new RuntimeException("Trying to write on a closed socket");
- }
-
- }
- }
-
- public void setStartBatching()
- {
- _batch = true;
- _batcher = ByteBuffer.allocate(1024);
- }
-
- public void close()
- {
- // MINA will sometimes throw away in-progress writes when you
- // ask it to close
- synchronized (lock)
- {
- try
- {
- _ch.close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- }
-
- public void setIdleTimeout(int i)
- {
- //noop
- }
-}
diff --git a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
deleted file mode 100644
index b93dc46741..0000000000
--- a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.SocketIOTest;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-
-public class IOWriterClient implements Runnable
-{
- private static final Logger _logger = LoggerFactory.getLogger(IOWriterClient.class);
-
- public static int DEFAULT_TEST_SIZE = 2;
-
- private IoSession _session;
-
- private long _startTime;
-
- private long[] _chunkTimes;
-
- public int _chunkCount = 200000;
-
- private int _chunkSize = 1024;
-
- private CountDownLatch _notifier;
-
- private int _maximumWriteQueueLength;
-
- static public int _PORT = IOWriterServer._PORT;
-
- public void run()
- {
- _logger.info("Starting to send " + _chunkCount + " buffers of " + _chunkSize + "B");
- _startTime = System.currentTimeMillis();
- _notifier = new CountDownLatch(1);
-
- for (int i = 0; i < _chunkCount; i++)
- {
- ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false);
- byte check = (byte) (i % 128);
- buf.put(check);
- buf.fill((byte) 88, buf.remaining());
- buf.flip();
-
- _session.write(buf);
- }
-
- long _sentall = System.currentTimeMillis();
- long _receivedall = _sentall;
- try
- {
- _logger.info("All buffers sent; waiting for receipt from server");
- _notifier.await();
- _receivedall = System.currentTimeMillis();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- _logger.info("Completed");
- _logger.info("Total time waiting for server after last write: " + (_receivedall - _sentall));
-
- long totalTime = System.currentTimeMillis() - _startTime;
-
- _logger.info("Total time: " + totalTime);
- _logger.info("MB per second: " + (int) ((1.0 * _chunkSize * _chunkCount) / totalTime));
- long lastChunkTime = _startTime;
- double average = 0;
- for (int i = 0; i < _chunkTimes.length; i++)
- {
- if (i == 0)
- {
- average = _chunkTimes[i] - _startTime;
- }
- else
- {
- long delta = _chunkTimes[i] - lastChunkTime;
- if (delta != 0)
- {
- average = (average + delta) / 2;
- }
- }
- lastChunkTime = _chunkTimes[i];
- }
- _logger.info("Average chunk time: " + average + "ms");
- _logger.info("Maximum WriteRequestQueue size: " + _maximumWriteQueueLength);
-
- CloseFuture cf = _session.close();
- _logger.info("Closing session");
- cf.join();
- }
-
- private class WriterHandler extends IoHandlerAdapter
- {
- private int _chunksReceived = 0;
-
- private int _partialBytesRead = 0;
-
- private byte _partialCheckNumber;
-
- private int _totalBytesReceived = 0;
-
- private int _receivedCount = 0;
- private int _sentCount = 0;
- private static final String DEFAULT_READ_BUFFER = "262144";
- private static final String DEFAULT_WRITE_BUFFER = "262144";
-
- public void sessionCreated(IoSession session) throws Exception
- {
- IoFilterChain chain = session.getFilterChain();
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
-
- writefilter.attach(chain);
- }
-
- public void messageSent(IoSession session, Object message) throws Exception
- {
- _maximumWriteQueueLength = Math.max(session.getScheduledWriteRequests(), _maximumWriteQueueLength);
-
- if (_logger.isDebugEnabled())
- {
- ++_sentCount;
- if (_sentCount % 1000 == 0)
- {
- _logger.debug("Sent count " + _sentCount + ":WQueue" + session.getScheduledWriteRequests());
-
- }
- }
- }
-
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- if (_logger.isDebugEnabled())
- {
- ++_receivedCount;
-
- if (_receivedCount % 1000 == 0)
- {
- _logger.debug("Receieved count " + _receivedCount);
- }
- }
-
- ByteBuffer result = (ByteBuffer) message;
- _totalBytesReceived += result.remaining();
- int size = result.remaining();
- long now = System.currentTimeMillis();
- if (_partialBytesRead > 0)
- {
- int offset = _chunkSize - _partialBytesRead;
- if (size >= offset)
- {
- _chunkTimes[_chunksReceived++] = now;
- result.position(offset);
- }
- else
- {
- // have not read even one chunk, including the previous partial bytes
- _partialBytesRead += size;
- return;
- }
- }
-
-
- int chunkCount = result.remaining() / _chunkSize;
-
- for (int i = 0; i < chunkCount; i++)
- {
- _chunkTimes[_chunksReceived++] = now;
- byte check = result.get();
- _logger.debug("Check number " + check + " read");
- if (check != (byte) ((_chunksReceived - 1) % 128))
- {
- _logger.error("Check number " + check + " read when expected " + (_chunksReceived % 128));
- }
- _logger.debug("Chunk times recorded");
-
- try
- {
- result.skip(_chunkSize - 1);
- }
- catch (IllegalArgumentException e)
- {
- _logger.error("Position was: " + result.position());
- _logger.error("Tried to skip to: " + (_chunkSize * i));
- _logger.error("limit was; " + result.limit());
- }
- }
- _logger.debug("Chunks received now " + _chunksReceived);
- _logger.debug("Bytes received: " + _totalBytesReceived);
- _partialBytesRead = result.remaining();
-
- if (_partialBytesRead > 0)
- {
- _partialCheckNumber = result.get();
- }
-
-
- if (_chunksReceived >= _chunkCount)
- {
- _notifier.countDown();
- }
-
- }
-
- public void exceptionCaught(IoSession session, Throwable cause) throws Exception
- {
- _logger.error("Error: " + cause, cause);
- }
- }
-
- public void startWriter() throws IOException, InterruptedException
- {
-
- _maximumWriteQueueLength = 0;
-
- IoConnector ioConnector = null;
-
- if (Boolean.getBoolean("multinio"))
- {
- _logger.warn("Using MultiThread NIO");
- ioConnector = new org.apache.mina.transport.socket.nio.MultiThreadSocketConnector();
- }
- else
- {
- _logger.warn("Using MINA NIO");
- ioConnector = new org.apache.mina.transport.socket.nio.SocketConnector();
- }
-
- SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getDefaultConfig().getSessionConfig();
- scfg.setTcpNoDelay(true);
- scfg.setSendBufferSize(32768);
- scfg.setReceiveBufferSize(32768);
-
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
-
-
- final InetSocketAddress address = new InetSocketAddress("localhost", _PORT);
- _logger.info("Attempting connection to " + address);
-
- //Old mina style
-// ioConnector.setHandler(new WriterHandler());
-// ConnectFuture future = ioConnector.connect(address);
- ConnectFuture future = ioConnector.connect(address, new WriterHandler());
- // wait for connection to complete
- future.join();
- _logger.info("Connection completed");
- // we call getSession which throws an IOException if there has been an error connecting
- _session = future.getSession();
-
- _chunkTimes = new long[_chunkCount];
- Thread t = new Thread(this);
- t.start();
- t.join();
- _logger.info("Test Complete");
- }
-
-
- public void test1k() throws IOException, InterruptedException
- {
- _logger.info("Starting 1k test");
- _chunkSize = 1024;
- startWriter();
- }
-
-
- public void test2k() throws IOException, InterruptedException
- {
- _logger.info("Starting 2k test");
- _chunkSize = 2048;
- startWriter();
- }
-
-
- public void test4k() throws IOException, InterruptedException
- {
- _logger.info("Starting 4k test");
- _chunkSize = 4096;
- startWriter();
- }
-
-
- public void test8k() throws IOException, InterruptedException
- {
- _logger.info("Starting 8k test");
- _chunkSize = 8192;
- startWriter();
- }
-
-
- public void test16k() throws IOException, InterruptedException
- {
- _logger.info("Starting 16k test");
- _chunkSize = 16384;
- startWriter();
- }
-
-
- public void test32k() throws IOException, InterruptedException
- {
- _logger.info("Starting 32k test");
- _chunkSize = 32768;
- startWriter();
- }
-
-
- public static int getIntArg(String[] args, int index, int defaultValue)
- {
- if (args.length > index)
- {
- try
- {
- return Integer.parseInt(args[index]);
- }
- catch (NumberFormatException e)
- {
- //Do nothing
- }
- }
- return defaultValue;
- }
-
- public static void main(String[] args) throws IOException, InterruptedException
- {
- _PORT = getIntArg(args, 0, _PORT);
-
- int test = getIntArg(args, 1, DEFAULT_TEST_SIZE);
-
- IOWriterClient w = new IOWriterClient();
- w._chunkCount = getIntArg(args, 2, w._chunkCount);
- switch (test)
- {
- case 0:
- w.test1k();
- w.test2k();
- w.test4k();
- w.test8k();
- w.test16k();
- w.test32k();
- break;
- case 1:
- w.test1k();
- break;
- case 2:
- w.test2k();
- break;
- case 4:
- w.test4k();
- break;
- case 8:
- w.test8k();
- break;
- case 16:
- w.test16k();
- break;
- case 32:
- w.test32k();
- break;
- }
- }
-}
diff --git a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java b/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
deleted file mode 100644
index 423e98c67b..0000000000
--- a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.SocketIOTest;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/** Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again. */
-public class IOWriterServer
-{
- private static final Logger _logger = LoggerFactory.getLogger(IOWriterServer.class);
-
- static public int _PORT = 9999;
-
- private static final String DEFAULT_READ_BUFFER = "262144";
- private static final String DEFAULT_WRITE_BUFFER = "262144";
-
-
- private static class TestHandler extends IoHandlerAdapter
- {
- private int _sentCount = 0;
-
- private int _bytesSent = 0;
-
- private int _receivedCount = 0;
-
- public void sessionCreated(IoSession ioSession) throws java.lang.Exception
- {
- IoFilterChain chain = ioSession.getFilterChain();
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
-
- writefilter.attach(chain);
-
- }
-
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- ((ByteBuffer) message).acquire();
- session.write(message);
-
- if (_logger.isDebugEnabled())
- {
- _bytesSent += ((ByteBuffer) message).remaining();
-
- _sentCount++;
-
- if (_sentCount % 1000 == 0)
- {
- _logger.debug("Bytes sent: " + _bytesSent);
- }
- }
- }
-
- public void messageSent(IoSession session, Object message) throws Exception
- {
- if (_logger.isDebugEnabled())
- {
- ++_receivedCount;
-
- if (_receivedCount % 1000 == 0)
- {
- _logger.debug("Receieved count " + _receivedCount);
- }
- }
- }
-
- public void exceptionCaught(IoSession session, Throwable cause) throws Exception
- {
- _logger.error("Error: " + cause, cause);
- }
- }
-
- public void startAcceptor() throws IOException
- {
- IoAcceptor acceptor;
- if (Boolean.getBoolean("multinio"))
- {
- _logger.warn("Using MultiThread NIO");
- acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor();
- }
- else
- {
- _logger.warn("Using MINA NIO");
- acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor();
- }
-
-
- SocketSessionConfig sc = (SocketSessionConfig) acceptor.getDefaultConfig().getSessionConfig();
- sc.setTcpNoDelay(true);
- sc.setSendBufferSize(32768);
- sc.setReceiveBufferSize(32768);
-
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
-
- //The old mina style
-// acceptor.setLocalAddress(new InetSocketAddress(_PORT));
-// acceptor.setHandler(new TestHandler());
-// acceptor.bind();
- acceptor.bind(new InetSocketAddress(_PORT), new TestHandler());
-
- _logger.info("Bound on port " + _PORT + ":" + _logger.isDebugEnabled());
- _logger.debug("debug on");
- }
-
- public static void main(String[] args) throws IOException
- {
-
- if (args.length > 0)
- {
- try
- {
- _PORT = Integer.parseInt(args[0]);
- }
- catch (NumberFormatException e)
- {
- //IGNORE so use default port 9999;
- }
- }
-
- IOWriterServer a = new IOWriterServer();
- a.startAcceptor();
- }
-}
diff --git a/qpid/java/systests/etc/config-systests-firewall-2.xml b/qpid/java/systests/etc/config-systests-firewall-2.xml
index 4c1bf9a800..2eedd65d54 100644
--- a/qpid/java/systests/etc/config-systests-firewall-2.xml
+++ b/qpid/java/systests/etc/config-systests-firewall-2.xml
@@ -35,17 +35,10 @@
<keystorePath>/path/to/keystore.ks</keystorePath>
<keystorePassword>keystorepass</keystorePassword>
</ssl>
- <qpidnio>false</qpidnio>
- <protectio>
- <enabled>false</enabled>
- <readBufferLimitSize>262144</readBufferLimitSize>
- <writeBufferLimitSize>262144</writeBufferLimitSize>
- </protectio>
- <transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
- <socketReceiveBuffer>32768</socketReceiveBuffer>
- <socketSendBuffer>32768</socketSendBuffer>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>false</enabled>
diff --git a/qpid/java/systests/etc/config-systests-firewall-3.xml b/qpid/java/systests/etc/config-systests-firewall-3.xml
index 19aaec9e54..fc7d9a4c76 100644
--- a/qpid/java/systests/etc/config-systests-firewall-3.xml
+++ b/qpid/java/systests/etc/config-systests-firewall-3.xml
@@ -35,17 +35,10 @@
<keystorePath>/path/to/keystore.ks</keystorePath>
<keystorePassword>keystorepass</keystorePassword>
</ssl>
- <qpidnio>false</qpidnio>
- <protectio>
- <enabled>false</enabled>
- <readBufferLimitSize>262144</readBufferLimitSize>
- <writeBufferLimitSize>262144</writeBufferLimitSize>
- </protectio>
- <transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
- <socketReceiveBuffer>32768</socketReceiveBuffer>
- <socketSendBuffer>32768</socketSendBuffer>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>false</enabled>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
index d4c550bc08..6d379e14d8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java
@@ -61,21 +61,6 @@ public class ServerConfigurationFileTest extends QpidBrokerTestCase
_serverConfig.getConfig().getProperty(property));
}
- public void testProtectIOEnabled() throws ConfigurationException
- {
- validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED);
- }
-
- public void testProtectIOReadBufferLimitSize() throws ConfigurationException
- {
- validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE);
- }
-
- public void testProtectIOWriteBufferLimitSize() throws ConfigurationException
- {
- validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE);
- }
-
public void testStatusUpdates() throws ConfigurationException
{
validatePropertyDefinedInFile(ServerConfiguration.STATUS_UPDATES);