diff options
author | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:08:44 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:08:44 +0000 |
commit | bfd022e2307b6628ee63e316f042ffb9b85300f7 (patch) | |
tree | 8c15d0ed1d363e0c44f9698ce3b6852fe71743b4 | |
parent | 122b2d411f119e4b46b77f20dc5002981db204a8 (diff) | |
download | qpid-python-bfd022e2307b6628ee63e316f042ffb9b85300f7.tar.gz |
QPID-3341: remove unused/dead transport code and accompanying implementation classes
Applied patch by Keith Wall and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1143865 13f79535-47bb-0310-9956-ffa450edef68
30 files changed, 26 insertions, 5316 deletions
diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml index c0f9b4df61..f4758d77a8 100644 --- a/qpid/java/broker/etc/config.xml +++ b/qpid/java/broker/etc/config.xml @@ -37,17 +37,10 @@ <keystorePath>/path/to/keystore.ks</keystorePath> <keystorePassword>keystorepass</keystorePassword> </ssl> - <qpidnio>false</qpidnio> - <protectio> - <enabled>false</enabled> - <readBufferLimitSize>262144</readBufferLimitSize> - <writeBufferLimitSize>262144</writeBufferLimitSize> - </protectio> - <transport>nio</transport> <port>5672</port> <sslport>8672</sslport> - <socketReceiveBuffer>32768</socketReceiveBuffer> - <socketSendBuffer>32768</socketSendBuffer> + <socketReceiveBuffer>262144</socketReceiveBuffer> + <socketSendBuffer>262144</socketSendBuffer> </connector> <management> <enabled>true</enabled> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index f152865a27..5908eb4bd8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -52,9 +52,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class); // Default Configuration values - public static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144; - public static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144; - public static final boolean DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED = false; + public static final int DEFAULT_BUFFER_SIZE = 262144; public static final String DEFAULT_STATUS_UPDATES = "on"; public static final String SECURITY_CONFIG_RELOADED = "SECURITY CONFIGURATION RELOADED"; @@ -84,9 +82,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa // Configuration values to be read from the configuration file //todo Move all properties to static values to ensure system testing can be performed. - public static final String CONNECTOR_PROTECTIO_ENABLED = "connector.protectio.enabled"; - public static final String CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE = "connector.protectio.readBufferLimitSize"; - public static final String CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE = "connector.protectio.writeBufferLimitSize"; public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket"; public static final String STATUS_UPDATES = "status-updates"; public static final String ADVANCED_LOCALE = "advanced.locale"; @@ -95,7 +90,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa envVarMap.put("QPID_PORT", "connector.port"); envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers"); envVarMap.put("QPID_SSLPORT", "connector.ssl.port"); - envVarMap.put("QPID_NIO", "connector.qpidnio"); envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool"); envVarMap.put("QPID_JMXPORT", "management.jmxport"); envVarMap.put("QPID_FRAMESIZE", "advanced.framesize"); @@ -545,21 +539,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getIntValue("advanced.framesize", DEFAULT_FRAME_SIZE); } - public boolean getProtectIOEnabled() - { - return getBooleanValue(CONNECTOR_PROTECTIO_ENABLED, DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED); - } - - public int getBufferReadLimit() - { - return getIntValue(CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_READ_LIMIT_SIZE); - } - - public int getBufferWriteLimit() - { - return getIntValue(CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_WRITE_LIMIT_SIZE); - } - public boolean getSynchedClocks() { return getBooleanValue("advanced.synced-clocks"); @@ -687,12 +666,12 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public int getReceiveBufferSize() { - return getIntValue("connector.socketReceiveBuffer", 32767); + return getIntValue("connector.socketReceiveBuffer", DEFAULT_BUFFER_SIZE); } public int getWriteBufferSize() { - return getIntValue("connector.socketWriteBuffer", 32767); + return getIntValue("connector.socketWriteBuffer", DEFAULT_BUFFER_SIZE); } public boolean getTcpNoDelay() @@ -735,11 +714,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getStringValue("connector.ssl.certType", "SunX509"); } - public boolean getQpidNIO() - { - return getBooleanValue("connector.qpidnio"); - } - public boolean getUseBiasedWrites() { return getBooleanValue("advanced.useWriteBiasedPool"); @@ -809,8 +783,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public Boolean getTcpNoDelay() { - // Can't call parent getTcpNoDelay since it just calls this one - return getBooleanValue("connector.tcpNoDelay", true); + return ServerConfiguration.this.getTcpNoDelay(); } public Integer getSoTimeout() @@ -825,7 +798,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public Integer getSendBufferSize() { - return getBufferWriteLimit(); + return ServerConfiguration.this.getWriteBufferSize(); } public Boolean getReuseAddress() @@ -835,7 +808,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public Integer getReceiveBufferSize() { - return getBufferReadLimit(); + return ServerConfiguration.this.getReceiveBufferSize(); } public Boolean getOOBInline() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 494003c8a0..484f93cb88 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -187,49 +187,6 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase assertEquals(23, serverConfig.getFrameSize()); } - public void testGetProtectIOEnabled() throws ConfigurationException - { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getProtectIOEnabled()); - - // Check value we set - _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED, true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getProtectIOEnabled()); - } - - public void testGetBufferReadLimit() throws ConfigurationException - { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(262144, serverConfig.getBufferReadLimit()); - - // Check value we set - _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, 23); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getBufferReadLimit()); - } - - public void testGetBufferWriteLimit() throws ConfigurationException - { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(262144, serverConfig.getBufferWriteLimit()); - - // Check value we set - _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, 23); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getBufferWriteLimit()); - } - - public void testGetStatusEnabled() throws ConfigurationException { // Check default @@ -543,7 +500,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals(32767, serverConfig.getReceiveBufferSize()); + assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE, serverConfig.getReceiveBufferSize()); // Check value we set _config.setProperty("connector.socketReceiveBuffer", "23"); @@ -557,7 +514,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals(32767, serverConfig.getWriteBufferSize()); + assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE, serverConfig.getWriteBufferSize()); // Check value we set _config.setProperty("connector.socketWriteBuffer", "23"); @@ -678,20 +635,6 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase assertEquals("a", serverConfig.getCertType()); } - public void testGetQpidNIO() throws ConfigurationException - { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getQpidNIO()); - - // Check value we set - _config.setProperty("connector.qpidnio", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getQpidNIO()); - } - public void testGetUseBiasedWrites() throws ConfigurationException { // Check default @@ -756,7 +699,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.close(); out = new FileWriter(fileB); - out.write("<broker><connector><ssl><port>2345</port></ssl><qpidnio>true</qpidnio></connector></broker>"); + out.write("<broker><connector><ssl><port>2345</port></ssl></connector></broker>"); out.close(); ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile()); @@ -767,8 +710,6 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase assertEquals(1, config.getPorts().size()); assertEquals("2342", config.getPorts().get(0)); // From the first file, not // present in the second - assertEquals(true, config.getQpidNIO()); // From the second file, not - // present in the first } public void testVariableInterpolation() throws Exception diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 40b332d216..98573c9cc3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -89,15 +89,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); - // TODO: use system property thingy for this - if (System.getProperty("UseTransportIo", "false").equals("false")) - { - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); - } - else - { - _conn.getProtocolHandler().createIoTransportSession(brokerDetail); - } + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index eb5af119b2..424e09693f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -211,21 +211,6 @@ public class AMQProtocolHandler implements ProtocolEngine } /** - * Called when we want to create a new IoTransport session - * @param brokerDetail - */ - public void createIoTransportSession(BrokerDetails brokerDetail) - { - _protocolSession = new AMQProtocolSession(this, _connection); - _stateManager.setProtocolSession(_protocolSession); - IoTransport.connect_0_9(getProtocolSession(), - brokerDetail.getHost(), - brokerDetail.getPort(), - brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); - _protocolSession.init(); - } - - /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index aef3a563af..97657a09f4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -30,7 +30,6 @@ import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; @@ -116,20 +115,8 @@ public class TransportConnection { public IoConnector newSocketConnector() { - SocketConnector result; - // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) - { - _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") - ? "Qpid NIO is new default" - : "Sysproperty 'qpidnio' is set")); - result = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); - } - else - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - } + SocketConnector result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector + // Don't have the connector's worker thread wait around for other connections (we only use // one SocketConnector per connection at the moment anyway). This allows short-running // clients (like unit tests) to complete quickly. diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java deleted file mode 100644 index 47f19aa76d..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferFullExeception.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.mina.filter; - -import org.apache.mina.common.IoFilter;/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -public class WriteBufferFullExeception extends RuntimeException -{ - private IoFilter.WriteRequest _writeRequest; - - public WriteBufferFullExeception() - { - this(null); - } - - public WriteBufferFullExeception(IoFilter.WriteRequest writeRequest) - { - _writeRequest = writeRequest; - } - - - public void setWriteRequest(IoFilter.WriteRequest writeRequest) - { - _writeRequest = writeRequest; - } - - public IoFilter.WriteRequest getWriteRequest() - { - return _writeRequest; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java b/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java deleted file mode 100644 index 4e9db9071a..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/filter/WriteBufferLimitFilterBuilder.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.filter; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.DefaultIoFilterChainBuilder; -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.executor.ExecutorFilter; - -import java.util.Iterator; -import java.util.List; - -/** - * This filter will turn the asynchronous filterWrite method in to a blocking send when there are more than - * the prescribed number of messages awaiting filterWrite. It should be used in conjunction with the - * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to - * cause an Out of Memory exception due to a back log of unprocessed messages. - * - * This is should only be viewed as a temporary work around for DIRMINA-302. - * - * A true solution should not be implemented as a filter as this issue will always occur. On a machine - * where the network is slower than the local producer. - * - * Suggested improvement is to allow implementation of policices on what to do when buffer is full. - * - * They could be: - * Block - As this does - * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks - * Throw Exception - through the client filterWrite() method to allow them to get immediate feedback on buffer state - * - * <p/> - * <p>Usage: - * <p/> - * <pre><code> - * DefaultFilterChainBuilder builder = ... - * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); - * filter.attach( builder ); - * </code></pre> - * <p/> - * or - * <p/> - * <pre><code> - * IoFilterChain chain = ... - * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder(); - * filter.attach( chain ); - * </code></pre> - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ - */ -public class WriteBufferLimitFilterBuilder -{ - public static final String PENDING_SIZE = WriteBufferLimitFilterBuilder.class.getName() + ".pendingSize"; - - private static int DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT = 5000; - - private volatile boolean throwNotBlock = false; - - private volatile int maximumConnectionBufferCount; - private volatile long maximumConnectionBufferSize; - - private final Object _blockLock = new Object(); - - private int _blockWaiters = 0; - - - public WriteBufferLimitFilterBuilder() - { - this(DEFAULT_CONNECTION_BUFFER_MESSAGE_COUNT); - } - - public WriteBufferLimitFilterBuilder(int maxWriteBufferSize) - { - setMaximumConnectionBufferCount(maxWriteBufferSize); - } - - - /** - * Set the maximum amount pending items in the writeQueue for a given session. - * Changing the value will only take effect when new data is received for a - * connection, including existing connections. Default value is 5000 msgs. - * - * @param maximumConnectionBufferCount New buffer size. Must be > 0 - */ - public void setMaximumConnectionBufferCount(int maximumConnectionBufferCount) - { - this.maximumConnectionBufferCount = maximumConnectionBufferCount; - this.maximumConnectionBufferSize = 0; - } - - public void setMaximumConnectionBufferSize(long maximumConnectionBufferSize) - { - this.maximumConnectionBufferSize = maximumConnectionBufferSize; - this.maximumConnectionBufferCount = 0; - } - - /** - * Attach this filter to the specified filter chain. It will search for the ThreadPoolFilter, and attach itself - * before and after that filter. - * - * @param chain {@link IoFilterChain} to attach self to. - */ - public void attach(IoFilterChain chain) - { - String name = getThreadPoolFilterEntryName(chain.getAll()); - - chain.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); - } - - /** - * Attach this filter to the specified builder. It will search for the - * {@link ExecutorFilter}, and attach itself before and after that filter. - * - * @param builder {@link DefaultIoFilterChainBuilder} to attach self to. - */ - public void attach(DefaultIoFilterChainBuilder builder) - { - String name = getThreadPoolFilterEntryName(builder.getAll()); - - builder.addBefore(name, getClass().getName() + ".sendlimit", new SendLimit()); - } - - private String getThreadPoolFilterEntryName(List entries) - { - Iterator i = entries.iterator(); - - while (i.hasNext()) - { - IoFilterChain.Entry entry = (IoFilterChain.Entry) i.next(); - - if (entry.getFilter().getClass().isAssignableFrom(ExecutorFilter.class)) - { - return entry.getName(); - } - } - - throw new IllegalStateException("Chain does not contain a ExecutorFilter"); - } - - - public class SendLimit extends IoFilterAdapter - { - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception - { - try - { - waitTillSendAllowed(session); - } - catch (WriteBufferFullExeception wbfe) - { - nextFilter.exceptionCaught(session, wbfe); - } - - if (writeRequest.getMessage() instanceof ByteBuffer) - { - increasePendingWriteSize(session, (ByteBuffer) writeRequest.getMessage()); - } - - nextFilter.filterWrite(session, writeRequest); - } - - private void increasePendingWriteSize(IoSession session, ByteBuffer message) - { - synchronized (session) - { - Long pendingSize = getScheduledWriteBytes(session) + message.remaining(); - session.setAttribute(PENDING_SIZE, pendingSize); - } - } - - private boolean sendAllowed(IoSession session) - { - if (session.isClosing()) - { - return true; - } - - int lmswm = maximumConnectionBufferCount; - long lmswb = maximumConnectionBufferSize; - - return (lmswm == 0 || session.getScheduledWriteRequests() < lmswm) - && (lmswb == 0 || getScheduledWriteBytes(session) < lmswb); - } - - private long getScheduledWriteBytes(IoSession session) - { - synchronized (session) - { - Long i = (Long) session.getAttribute(PENDING_SIZE); - return null == i ? 0 : i; - } - } - - private void waitTillSendAllowed(IoSession session) - { - synchronized (_blockLock) - { - if (throwNotBlock) - { - throw new WriteBufferFullExeception(); - } - - _blockWaiters++; - - while (!sendAllowed(session)) - { - try - { - _blockLock.wait(); - } - catch (InterruptedException e) - { - // Ignore. - } - } - _blockWaiters--; - } - } - - public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception - { - if (message instanceof ByteBuffer) - { - decrementPendingWriteSize(session, (ByteBuffer) message); - } - notifyWaitingWriters(); - nextFilter.messageSent(session, message); - } - - private void decrementPendingWriteSize(IoSession session, ByteBuffer message) - { - synchronized (session) - { - session.setAttribute(PENDING_SIZE, getScheduledWriteBytes(session) - message.remaining()); - } - } - - private void notifyWaitingWriters() - { - synchronized (_blockLock) - { - if (_blockWaiters != 0) - { - _blockLock.notifyAll(); - } - } - - } - - }//SentLimit - - -} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java deleted file mode 100644 index e5360d32e0..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketAcceptor.java +++ /dev/null @@ -1,547 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.BaseIoAcceptor; -import org.apache.mina.util.Queue; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.NamePreservingRunnable; -import edu.emory.mathcs.backport.java.util.concurrent.Executor; - -/** - * {@link IoAcceptor} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ - */ -public class MultiThreadSocketAcceptor extends SocketAcceptor -{ - /** - * @noinspection StaticNonFinalField - */ - private static volatile int nextId = 0; - - private final Executor executor; - private final Object lock = new Object(); - private final int id = nextId ++; - private final String threadName = "SocketAcceptor-" + id; - private final Map channels = new HashMap(); - - private final Queue registerQueue = new Queue(); - private final Queue cancelQueue = new Queue(); - - private final MultiThreadSocketIoProcessor[] ioProcessors; - private final int processorCount; - - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ - private Selector selector; - private Worker worker; - private int processorDistributor = 0; - - /** - * Create an acceptor with a single processing thread using a NewThreadExecutor - */ - public MultiThreadSocketAcceptor() - { - this( 1, new NewThreadExecutor() ); - } - - /** - * Create an acceptor with the desired number of processing threads - * - * @param processorCount Number of processing threads - * @param executor Executor to use for launching threads - */ - public MultiThreadSocketAcceptor( int processorCount, Executor executor ) - { - if( processorCount < 1 ) - { - throw new IllegalArgumentException( "Must have at least one processor" ); - } - - this.executor = executor; - this.processorCount = processorCount; - ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; - - for( int i = 0; i < processorCount; i++ ) - { - ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor ); - } - } - - - /** - * Binds to the specified <code>address</code> and handles incoming connections with the specified - * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. - * - * @throws IOException if failed to bind - */ - public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException - { - if( handler == null ) - { - throw new NullPointerException( "handler" ); - } - - if( address != null && !( address instanceof InetSocketAddress ) ) - { - throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); - } - - if( config == null ) - { - config = getDefaultConfig(); - } - - RegistrationRequest request = new RegistrationRequest( address, handler, config ); - - synchronized( registerQueue ) - { - registerQueue.push( request ); - } - - startupWorker(); - - selector.wakeup(); - - synchronized( request ) - { - while( !request.done ) - { - try - { - request.wait(); - } - catch( InterruptedException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - } - } - - if( request.exception != null ) - { - throw request.exception; - } - } - - - private synchronized void startupWorker() throws IOException - { - synchronized( lock ) - { - if( worker == null ) - { - selector = Selector.open(); - worker = new Worker(); - - executor.execute( new NamePreservingRunnable( worker ) ); - } - } - } - - public void unbind( SocketAddress address ) - { - if( address == null ) - { - throw new NullPointerException( "address" ); - } - - CancellationRequest request = new CancellationRequest( address ); - - try - { - startupWorker(); - } - catch( IOException e ) - { - // IOException is thrown only when Worker thread is not - // running and failed to open a selector. We simply throw - // IllegalArgumentException here because we can simply - // conclude that nothing is bound to the selector. - throw new IllegalArgumentException( "Address not bound: " + address ); - } - - synchronized( cancelQueue ) - { - cancelQueue.push( request ); - } - - selector.wakeup(); - - synchronized( request ) - { - while( !request.done ) - { - try - { - request.wait(); - } - catch( InterruptedException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - } - } - - if( request.exception != null ) - { - request.exception.fillInStackTrace(); - - throw request.exception; - } - } - - - private class Worker implements Runnable - { - public void run() - { - Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName ); - - for( ; ; ) - { - try - { - int nKeys = selector.select(); - - registerNew(); - - if( nKeys > 0 ) - { - processSessions( selector.selectedKeys() ); - } - - cancelKeys(); - - if( selector.keys().isEmpty() ) - { - synchronized( lock ) - { - if( selector.keys().isEmpty() && - registerQueue.isEmpty() && - cancelQueue.isEmpty() ) - { - worker = null; - try - { - selector.close(); - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - finally - { - selector = null; - } - break; - } - } - } - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - - try - { - Thread.sleep( 1000 ); - } - catch( InterruptedException e1 ) - { - ExceptionMonitor.getInstance().exceptionCaught( e1 ); - } - } - } - } - - private void processSessions( Set keys ) throws IOException - { - Iterator it = keys.iterator(); - while( it.hasNext() ) - { - SelectionKey key = ( SelectionKey ) it.next(); - - it.remove(); - - if( !key.isAcceptable() ) - { - continue; - } - - ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); - - SocketChannel ch = ssc.accept(); - - if( ch == null ) - { - continue; - } - - boolean success = false; - try - { - - RegistrationRequest req = ( RegistrationRequest ) key.attachment(); - - MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl( - MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(), - req.config, ch, req.handler, req.address ); - - // New Interface -// SocketSessionImpl session = new SocketSessionImpl( -// SocketAcceptor.this, nextProcessor(), getListeners(), -// req.config, ch, req.handler, req.address ); - - - getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); - req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); - req.config.getThreadModel().buildFilterChain( session.getFilterChain() ); - session.getIoProcessor().addNew( session ); - success = true; - } - catch( Throwable t ) - { - ExceptionMonitor.getInstance().exceptionCaught( t ); - } - finally - { - if( !success ) - { - ch.close(); - } - } - } - } - } - - private MultiThreadSocketIoProcessor nextProcessor() - { - return ioProcessors[processorDistributor++ % processorCount]; - } - - - private void registerNew() - { - if( registerQueue.isEmpty() ) - { - return; - } - - for( ; ; ) - { - RegistrationRequest req; - - synchronized( registerQueue ) - { - req = ( RegistrationRequest ) registerQueue.pop(); - } - - if( req == null ) - { - break; - } - - ServerSocketChannel ssc = null; - - try - { - ssc = ServerSocketChannel.open(); - ssc.configureBlocking( false ); - - // Configure the server socket, - SocketAcceptorConfig cfg; - if( req.config instanceof SocketAcceptorConfig ) - { - cfg = ( SocketAcceptorConfig ) req.config; - } - else - { - cfg = ( SocketAcceptorConfig ) getDefaultConfig(); - } - - ssc.socket().setReuseAddress( cfg.isReuseAddress() ); - ssc.socket().setReceiveBufferSize( - ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() ); - - // and bind. - ssc.socket().bind( req.address, cfg.getBacklog() ); - if( req.address == null || req.address.getPort() == 0 ) - { - req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress(); - } - ssc.register( selector, SelectionKey.OP_ACCEPT, req ); - - synchronized( channels ) - { - channels.put( req.address, ssc ); - } - - getListeners().fireServiceActivated( - this, req.address, req.handler, req.config ); - } - catch( IOException e ) - { - req.exception = e; - } - finally - { - synchronized( req ) - { - req.done = true; - - req.notifyAll(); - } - - if( ssc != null && req.exception != null ) - { - try - { - ssc.close(); - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - } - } - } - } - - - private void cancelKeys() - { - if( cancelQueue.isEmpty() ) - { - return; - } - - for( ; ; ) - { - CancellationRequest request; - - synchronized( cancelQueue ) - { - request = ( CancellationRequest ) cancelQueue.pop(); - } - - if( request == null ) - { - break; - } - - ServerSocketChannel ssc; - synchronized( channels ) - { - ssc = ( ServerSocketChannel ) channels.remove( request.address ); - } - - // close the channel - try - { - if( ssc == null ) - { - request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); - } - else - { - SelectionKey key = ssc.keyFor( selector ); - request.registrationRequest = ( RegistrationRequest ) key.attachment(); - key.cancel(); - - selector.wakeup(); // wake up again to trigger thread death - - ssc.close(); - } - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught( e ); - } - finally - { - synchronized( request ) - { - request.done = true; - request.notifyAll(); - } - - if( request.exception == null ) - { - getListeners().fireServiceDeactivated( - this, request.address, - request.registrationRequest.handler, - request.registrationRequest.config ); - } - } - } - } - - private static class RegistrationRequest - { - private InetSocketAddress address; - private final IoHandler handler; - private final IoServiceConfig config; - private IOException exception; - private boolean done; - - private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config ) - { - this.address = ( InetSocketAddress ) address; - this.handler = handler; - this.config = config; - } - } - - - private static class CancellationRequest - { - private final SocketAddress address; - private boolean done; - private RegistrationRequest registrationRequest; - private RuntimeException exception; - - private CancellationRequest( SocketAddress address ) - { - this.address = address; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java deleted file mode 100644 index 7344f70078..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketConnector.java +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoConnectorConfig; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.AbstractIoFilterChain; -import org.apache.mina.common.support.DefaultConnectFuture; -import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.Queue; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; - -/** - * {@link IoConnector} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ - */ -public class MultiThreadSocketConnector extends SocketConnector -{ - /** @noinspection StaticNonFinalField */ - private static volatile int nextId = 0; - - private final Object lock = new Object(); - private final int id = nextId++; - private final String threadName = "SocketConnector-" + id; - - private final Queue connectQueue = new Queue(); - private final MultiThreadSocketIoProcessor[] ioProcessors; - private final int processorCount; - private final Executor executor; - - /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ - private Selector selector; - private Worker worker; - private int processorDistributor = 0; - private int workerTimeout = 60; // 1 min. - - /** Create a connector with a single processing thread using a NewThreadExecutor */ - public MultiThreadSocketConnector() - { - this(1, new NewThreadExecutor()); - } - - /** - * Create a connector with the desired number of processing threads - * - * @param processorCount Number of processing threads - * @param executor Executor to use for launching threads - */ - public MultiThreadSocketConnector(int processorCount, Executor executor) - { - if (processorCount < 1) - { - throw new IllegalArgumentException("Must have at least one processor"); - } - - this.executor = executor; - this.processorCount = processorCount; - ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; - - for (int i = 0; i < processorCount; i++) - { - ioProcessors[i] = new MultiThreadSocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); - } - } - - /** - * How many seconds to keep the connection thread alive between connection requests - * - * @return Number of seconds to keep connection thread alive - */ - public int getWorkerTimeout() - { - return workerTimeout; - } - - /** - * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. - * - * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 - */ - public void setWorkerTimeout(int workerTimeout) - { - if (workerTimeout < 0) - { - throw new IllegalArgumentException("Must be >= 0"); - } - this.workerTimeout = workerTimeout; - } - - public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) - { - return connect(address, null, handler, config); - } - - public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoServiceConfig config) - { - if (address == null) - { - throw new NullPointerException("address"); - } - if (handler == null) - { - throw new NullPointerException("handler"); - } - - if (!(address instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected address type: " - + address.getClass()); - } - - if (localAddress != null && !(localAddress instanceof InetSocketAddress)) - { - throw new IllegalArgumentException("Unexpected local address type: " - + localAddress.getClass()); - } - - if (config == null) - { - config = getDefaultConfig(); - } - - SocketChannel ch = null; - boolean success = false; - try - { - ch = SocketChannel.open(); - ch.socket().setReuseAddress(true); - if (localAddress != null) - { - ch.socket().bind(localAddress); - } - - ch.configureBlocking(false); - - if (ch.connect(address)) - { - DefaultConnectFuture future = new DefaultConnectFuture(); - newSession(ch, handler, config, future); - success = true; - return future; - } - - success = true; - } - catch (IOException e) - { - return DefaultConnectFuture.newFailedFuture(e); - } - finally - { - if (!success && ch != null) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - - ConnectionRequest request = new ConnectionRequest(ch, handler, config); - synchronized (lock) - { - try - { - startupWorker(); - } - catch (IOException e) - { - try - { - ch.close(); - } - catch (IOException e2) - { - ExceptionMonitor.getInstance().exceptionCaught(e2); - } - - return DefaultConnectFuture.newFailedFuture(e); - } - } - - synchronized (connectQueue) - { - connectQueue.push(request); - } - selector.wakeup(); - - return request; - } - - private synchronized void startupWorker() throws IOException - { - if (worker == null) - { - selector = Selector.open(); - worker = new Worker(); - executor.execute(new NamePreservingRunnable(worker)); - } - } - - private void registerNew() - { - if (connectQueue.isEmpty()) - { - return; - } - - for (; ;) - { - ConnectionRequest req; - synchronized (connectQueue) - { - req = (ConnectionRequest) connectQueue.pop(); - } - - if (req == null) - { - break; - } - - SocketChannel ch = req.channel; - try - { - ch.register(selector, SelectionKey.OP_CONNECT, req); - } - catch (IOException e) - { - req.setException(e); - } - } - } - - private void processSessions(Set keys) - { - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isConnectable()) - { - continue; - } - - SocketChannel ch = (SocketChannel) key.channel(); - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - boolean success = false; - try - { - ch.finishConnect(); - newSession(ch, entry.handler, entry.config, entry); - success = true; - } - catch (Throwable e) - { - entry.setException(e); - } - finally - { - key.cancel(); - if (!success) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - keys.clear(); - } - - private void processTimedOutSessions(Set keys) - { - long currentTime = System.currentTimeMillis(); - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isValid()) - { - continue; - } - - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - if (currentTime >= entry.deadline) - { - entry.setException(new ConnectException()); - try - { - key.channel().close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - key.cancel(); - } - } - } - } - - private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - { - MultiThreadSocketSessionImpl session = - new MultiThreadSocketSessionImpl(this, nextProcessor(), getListeners(), - config, ch, handler, ch.socket().getRemoteSocketAddress()); - - //new interface -// SocketSessionImpl session = new SocketSessionImpl( -// this, nextProcessor(), getListeners(), -// config, ch, handler, ch.socket().getRemoteSocketAddress() ); - try - { - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getThreadModel().buildFilterChain(session.getFilterChain()); - } - catch (Throwable e) - { - throw (IOException) new IOException("Failed to create a session.").initCause(e); - } - - // Set the ConnectFuture of the specified session, which will be - // removed and notified by AbstractIoFilterChain eventually. - session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, connectFuture ); - - // Forward the remaining process to the SocketIoProcessor. - session.getIoProcessor().addNew(session); - } - - private MultiThreadSocketIoProcessor nextProcessor() - { - return ioProcessors[processorDistributor++ % processorCount]; - } - - private class Worker implements Runnable - { - private long lastActive = System.currentTimeMillis(); - - public void run() - { - Thread.currentThread().setName(MultiThreadSocketConnector.this.threadName); - - for (; ;) - { - try - { - int nKeys = selector.select(1000); - - registerNew(); - - if (nKeys > 0) - { - processSessions(selector.selectedKeys()); - } - - processTimedOutSessions(selector.keys()); - - if (selector.keys().isEmpty()) - { - if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) - { - synchronized (lock) - { - if (selector.keys().isEmpty() && - connectQueue.isEmpty()) - { - worker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - break; - } - } - } - } - else - { - lastActive = System.currentTimeMillis(); - } - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - } - } - - private class ConnectionRequest extends DefaultConnectFuture - { - private final SocketChannel channel; - private final long deadline; - private final IoHandler handler; - private final IoServiceConfig config; - - private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) - { - this.channel = channel; - long timeout; - if (config instanceof IoConnectorConfig) - { - timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); - } - else - { - timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); - } - this.deadline = System.currentTimeMillis() + timeout; - this.handler = handler; - this.config = config; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java deleted file mode 100644 index 67b8c8d820..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketFilterChain.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import java.io.IOException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.support.AbstractIoFilterChain; -import org.apache.mina.util.Queue; - -/** - * An {@link IoFilterChain} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - */ -class MultiThreadSocketFilterChain extends AbstractIoFilterChain { - - MultiThreadSocketFilterChain( IoSession parent ) - { - super( parent ); - } - - protected void doWrite( IoSession session, WriteRequest writeRequest ) - { - MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; - Queue writeRequestQueue = s.getWriteRequestQueue(); - - // SocketIoProcessor.doFlush() will reset it after write is finished - // because the buffer will be passed with messageSent event. - ( ( ByteBuffer ) writeRequest.getMessage() ).mark(); - synchronized( writeRequestQueue ) - { - writeRequestQueue.push( writeRequest ); - if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) - { - // Notify SocketIoProcessor only when writeRequestQueue was empty. - s.getIoProcessor().flush( s ); - } - } - } - - protected void doClose( IoSession session ) throws IOException - { - MultiThreadSocketSessionImpl s = (MultiThreadSocketSessionImpl) session; - s.getIoProcessor().remove( s ); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java deleted file mode 100644 index c23ad8686f..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java +++ /dev/null @@ -1,1026 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; -import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.WriteTimeoutException; -import org.apache.mina.util.IdentityHashSet; -import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.Queue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $, - */ -class MultiThreadSocketIoProcessor extends SocketIoProcessor -{ - Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class); - Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader"); - Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer"); - - private static final long SELECTOR_TIMEOUT = 1000L; - - private int MAX_READ_BYTES_PER_SESSION = 524288; //512K - private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K - - private final Object readLock = new Object(); - private final Object writeLock = new Object(); - - private final String threadName; - private final Executor executor; - - private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); - - /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ - private volatile Selector selector, writeSelector; - - private final Queue newSessions = new Queue(); - private final Queue removingSessions = new Queue(); - private final BlockingQueue flushingSessions = new LinkedBlockingQueue(); - private final IdentityHashSet flushingSessionsSet = new IdentityHashSet(); - - private final Queue trafficControllingSessions = new Queue(); - - private ReadWorker readWorker; - private WriteWorker writeWorker; - private long lastIdleReadCheckTime = System.currentTimeMillis(); - private long lastIdleWriteCheckTime = System.currentTimeMillis(); - - MultiThreadSocketIoProcessor(String threadName, Executor executor) - { - super(threadName, executor); - this.threadName = threadName; - this.executor = executor; - } - - void addNew(SocketSessionImpl session) throws IOException - { - synchronized (newSessions) - { - newSessions.push(session); - } - - startupWorker(); - - selector.wakeup(); - writeSelector.wakeup(); - } - - void remove(SocketSessionImpl session) throws IOException - { - scheduleRemove(session); - startupWorker(); - selector.wakeup(); - } - - private void startupWorker() throws IOException - { - synchronized (readLock) - { - if (readWorker == null) - { - selector = Selector.open(); - readWorker = new ReadWorker(); - executor.execute(new NamePreservingRunnable(readWorker)); - } - } - - synchronized (writeLock) - { - if (writeWorker == null) - { - writeSelector = Selector.open(); - writeWorker = new WriteWorker(); - executor.execute(new NamePreservingRunnable(writeWorker)); - } - } - - } - - void flush(SocketSessionImpl session) - { - scheduleFlush(session); - Selector selector = this.writeSelector; - - if (selector != null) - { - selector.wakeup(); - } - } - - void updateTrafficMask(SocketSessionImpl session) - { - scheduleTrafficControl(session); - Selector selector = this.selector; - if (selector != null) - { - selector.wakeup(); - } - } - - private void scheduleRemove(SocketSessionImpl session) - { - synchronized (removingSessions) - { - removingSessions.push(session); - } - } - - private void scheduleFlush(SocketSessionImpl session) - { - synchronized (flushingSessionsSet) - { - //if flushingSessions grows to contain Integer.MAX_VALUE sessions - // then this will fail. - if (flushingSessionsSet.add(session)) - { - flushingSessions.offer(session); - } - } - } - - private void scheduleTrafficControl(SocketSessionImpl session) - { - synchronized (trafficControllingSessions) - { - trafficControllingSessions.push(session); - } - } - - private void doAddNewReader() throws InterruptedException - { - if (newSessions.isEmpty()) - { - return; - } - - for (; ;) - { - MultiThreadSocketSessionImpl session; - - synchronized (newSessions) - { - session = (MultiThreadSocketSessionImpl) newSessions.peek(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - - - try - { - - ch.configureBlocking(false); - session.setSelectionKey(ch.register(selector, - SelectionKey.OP_READ, - session)); - - //System.out.println("ReadDebug:"+"Awaiting Registration"); - session.awaitRegistration(); - sessionCreated(session); - } - catch (IOException e) - { - // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute - // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught(session, e); - } - } - } - - - private void doAddNewWrite() throws InterruptedException - { - if (newSessions.isEmpty()) - { - return; - } - - for (; ;) - { - MultiThreadSocketSessionImpl session; - - synchronized (newSessions) - { - session = (MultiThreadSocketSessionImpl) newSessions.peek(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - - try - { - ch.configureBlocking(false); - synchronized (flushingSessionsSet) - { - flushingSessionsSet.add(session); - } - - session.setWriteSelectionKey(ch.register(writeSelector, - SelectionKey.OP_WRITE, - session)); - - //System.out.println("WriteDebug:"+"Awaiting Registration"); - session.awaitRegistration(); - sessionCreated(session); - } - catch (IOException e) - { - - // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute - // and call ConnectFuture.setException(). - session.getFilterChain().fireExceptionCaught(session, e); - } - } - } - - - private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException - { - MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - synchronized (newSessions) - { - if (!session.created()) - { - _logger.debug("Popping new session"); - newSessions.pop(); - - // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here - // in AbstractIoFilterChain.fireSessionOpened(). - session.getServiceListeners().fireSessionCreated(session); - - session.doneCreation(); - } - } - } - - private void doRemove() - { - if (removingSessions.isEmpty()) - { - return; - } - - for (; ;) - { - MultiThreadSocketSessionImpl session; - - synchronized (removingSessions) - { - session = (MultiThreadSocketSessionImpl) removingSessions.pop(); - } - - if (session == null) - { - break; - } - - SocketChannel ch = session.getChannel(); - SelectionKey key = session.getReadSelectionKey(); - SelectionKey writeKey = session.getWriteSelectionKey(); - - // Retry later if session is not yet fully initialized. - // (In case that Session.close() is called before addSession() is processed) - if (key == null || writeKey == null) - { - scheduleRemove(session); - break; - } - // skip if channel is already closed - if (!key.isValid() || !writeKey.isValid()) - { - continue; - } - - try - { - //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); - synchronized (readLock) - { - key.cancel(); - } - synchronized (writeLock) - { - writeKey.cancel(); - } - ch.close(); - } - catch (IOException e) - { - session.getFilterChain().fireExceptionCaught(session, e); - } - finally - { - releaseWriteBuffers(session); - session.getServiceListeners().fireSessionDestroyed(session); - } - } - } - - private void processRead(Set selectedKeys) - { - Iterator it = selectedKeys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); - - synchronized (readLock) - { - if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) - { - read(session); - } - } - - } - - selectedKeys.clear(); - } - - private void processWrite(Set selectedKeys) - { - Iterator it = selectedKeys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - - synchronized (writeLock) - { - if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) - { - - // Clear OP_WRITE - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - - synchronized (flushingSessionsSet) - { - flushingSessions.offer(session); - } - } - } - } - - selectedKeys.clear(); - } - - private void read(SocketSessionImpl session) - { - - //if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session)); - } - - int totalReadBytes = 0; - - while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) - { - ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); - SocketChannel ch = session.getChannel(); - - try - { - buf.clear(); - - int readBytes = 0; - int ret; - - try - { - while ((ret = ch.read(buf.buf())) > 0) - { - readBytes += ret; - totalReadBytes += ret; - } - } - finally - { - buf.flip(); - } - - - if (readBytes > 0) - { - session.increaseReadBytes(readBytes); - - session.getFilterChain().fireMessageReceived(session, buf); - buf = null; - } - - if (ret <= 0) - { - if (ret == 0) - { - if (readBytes == session.getReadBufferSize()) - { - continue; - } - } - else - { - scheduleRemove(session); - } - - break; - } - } - catch (Throwable e) - { - if (e instanceof IOException) - { - scheduleRemove(session); - } - session.getFilterChain().fireExceptionCaught(session, e); - - //Stop Reading this session. - return; - } - finally - { - if (buf != null) - { - buf.release(); - } - } - }//for - - // if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes); - } - } - - - private void notifyReadIdleness() - { - // process idle sessions - long currentTime = System.currentTimeMillis(); - if ((currentTime - lastIdleReadCheckTime) >= 1000) - { - lastIdleReadCheckTime = currentTime; - Set keys = selector.keys(); - if (keys != null) - { - for (Iterator it = keys.iterator(); it.hasNext();) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - notifyReadIdleness(session, currentTime); - } - } - } - } - - private void notifyWriteIdleness() - { - // process idle sessions - long currentTime = System.currentTimeMillis(); - if ((currentTime - lastIdleWriteCheckTime) >= 1000) - { - lastIdleWriteCheckTime = currentTime; - Set keys = writeSelector.keys(); - if (keys != null) - { - for (Iterator it = keys.iterator(); it.hasNext();) - { - SelectionKey key = (SelectionKey) it.next(); - SocketSessionImpl session = (SocketSessionImpl) key.attachment(); - notifyWriteIdleness(session, currentTime); - } - } - } - } - - private void notifyReadIdleness(SocketSessionImpl session, long currentTime) - { - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), - IdleStatus.BOTH_IDLE, - Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.READER_IDLE), - IdleStatus.READER_IDLE, - Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); - - notifyWriteTimeout(session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime()); - } - - private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) - { - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), - IdleStatus.BOTH_IDLE, - Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); - notifyIdleness0( - session, currentTime, - session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), - IdleStatus.WRITER_IDLE, - Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); - - notifyWriteTimeout(session, currentTime, session - .getWriteTimeoutInMillis(), session.getLastWriteTime()); - } - - private void notifyIdleness0(SocketSessionImpl session, long currentTime, - long idleTime, IdleStatus status, - long lastIoTime) - { - if (idleTime > 0 && lastIoTime != 0 - && (currentTime - lastIoTime) >= idleTime) - { - session.increaseIdleCount(status); - session.getFilterChain().fireSessionIdle(session, status); - } - } - - private void notifyWriteTimeout(SocketSessionImpl session, - long currentTime, - long writeTimeout, long lastIoTime) - { - - MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; - SelectionKey key = sesh.getWriteSelectionKey(); - - synchronized (writeLock) - { - if (writeTimeout > 0 - && (currentTime - lastIoTime) >= writeTimeout - && key != null && key.isValid() - && (key.interestOps() & SelectionKey.OP_WRITE) != 0) - { - session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); - } - } - } - - private SocketSessionImpl getNextFlushingSession() - { - return (SocketSessionImpl) flushingSessions.poll(); - } - - private void releaseSession(SocketSessionImpl session) - { - synchronized (session.getWriteRequestQueue()) - { - synchronized (flushingSessionsSet) - { - if (session.getScheduledWriteRequests() > 0) - { - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session)); - } - flushingSessions.offer(session); - } - else - { - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session)); - } - flushingSessionsSet.remove(session); - } - } - } - } - - private void releaseWriteBuffers(SocketSessionImpl session) - { - Queue writeRequestQueue = session.getWriteRequestQueue(); - WriteRequest req; - - //Should this be synchronized? - synchronized (writeRequestQueue) - { - while ((req = (WriteRequest) writeRequestQueue.pop()) != null) - { - try - { - ((ByteBuffer) req.getMessage()).release(); - } - catch (IllegalStateException e) - { - session.getFilterChain().fireExceptionCaught(session, e); - } - finally - { - req.getFuture().setWritten(false); - } - } - } - } - - private void doFlush() - { - MultiThreadSocketSessionImpl session; - - while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) - { - if (!session.isConnected()) - { - releaseWriteBuffers(session); - releaseSession(session); - continue; - } - - SelectionKey key = session.getWriteSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.write() is called before addSession() is processed) - if (key == null) - { - scheduleFlush(session); - releaseSession(session); - continue; - } - // skip if channel is already closed - if (!key.isValid()) - { - releaseSession(session); - continue; - } - - try - { - if (doFlush(session)) - { - releaseSession(session); - } - } - catch (IOException e) - { - releaseSession(session); - scheduleRemove(session); - session.getFilterChain().fireExceptionCaught(session, e); - } - - } - - } - - private boolean doFlush(SocketSessionImpl sessionParam) throws IOException - { - MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; - // Clear OP_WRITE - SelectionKey key = session.getWriteSelectionKey(); - synchronized (writeLock) - { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - } - SocketChannel ch = session.getChannel(); - Queue writeRequestQueue = session.getWriteRequestQueue(); - - long totalFlushedBytes = 0; - while (true) - { - WriteRequest req; - - synchronized (writeRequestQueue) - { - req = (WriteRequest) writeRequestQueue.first(); - } - - if (req == null) - { - break; - } - - ByteBuffer buf = (ByteBuffer) req.getMessage(); - if (buf.remaining() == 0) - { - synchronized (writeRequestQueue) - { - writeRequestQueue.pop(); - } - - session.increaseWrittenMessages(); - - buf.reset(); - session.getFilterChain().fireMessageSent(session, req); - continue; - } - - - int writtenBytes = 0; - - // Reported as DIRMINA-362 - //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. - if (key.isWritable()) - { - writtenBytes = ch.write(buf.buf()); - totalFlushedBytes += writtenBytes; - } - - if (writtenBytes > 0) - { - session.increaseWrittenBytes(writtenBytes); - } - - if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) - { - // Kernel buffer is full - synchronized (writeLock) - { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - } - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); - } - return false; - } - } - - if (_loggerWrite.isDebugEnabled()) - { - //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); - } - return true; - } - - private void doUpdateTrafficMask() - { - if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked()) - { - return; - } - - // Synchronize over entire operation as this method should be called - // from both read and write thread and we don't want the order of the - // updates to get changed. - trafficMaskUpdateLock.lock(); - try - { - for (; ;) - { - MultiThreadSocketSessionImpl session; - - session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop(); - - if (session == null) - { - break; - } - - SelectionKey key = session.getReadSelectionKey(); - // Retry later if session is not yet fully initialized. - // (In case that Session.suspend??() or session.resume??() is - // called before addSession() is processed) - if (key == null) - { - scheduleTrafficControl(session); - break; - } - // skip if channel is already closed - if (!key.isValid()) - { - continue; - } - - // The normal is OP_READ and, if there are write requests in the - // session's write queue, set OP_WRITE to trigger flushing. - - //Sset to Read and Write if there is nothing then the cost - // is one loop through the flusher. - int ops = SelectionKey.OP_READ; - - // Now mask the preferred ops with the mask of the current session - int mask = session.getTrafficMask().getInterestOps(); - synchronized (readLock) - { - key.interestOps(ops & mask); - } - //Change key to the WriteSelection Key - key = session.getWriteSelectionKey(); - if (key != null && key.isValid()) - { - Queue writeRequestQueue = session.getWriteRequestQueue(); - synchronized (writeRequestQueue) - { - if (!writeRequestQueue.isEmpty()) - { - ops = SelectionKey.OP_WRITE; - synchronized (writeLock) - { - key.interestOps(ops & mask); - } - } - } - } - } - } - finally - { - trafficMaskUpdateLock.unlock(); - } - - } - - private class WriteWorker implements Runnable - { - - public void run() - { - Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer"); - - //System.out.println("WriteDebug:"+"Startup"); - for (; ;) - { - try - { - int nKeys = writeSelector.select(SELECTOR_TIMEOUT); - - doAddNewWrite(); - doUpdateTrafficMask(); - - if (nKeys > 0) - { - //System.out.println("WriteDebug:"+nKeys + " keys from writeselector"); - processWrite(writeSelector.selectedKeys()); - } - else - { - //System.out.println("WriteDebug:"+"No keys from writeselector"); - } - - doRemove(); - notifyWriteIdleness(); - - if (flushingSessionsSet.size() > 0) - { - doFlush(); - } - - if (writeSelector.keys().isEmpty()) - { - synchronized (writeLock) - { - - if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) - { - writeWorker = null; - try - { - writeSelector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - writeSelector = null; - } - - break; - } - } - } - - } - catch (Throwable t) - { - ExceptionMonitor.getInstance().exceptionCaught(t); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - //System.out.println("WriteDebug:"+"Shutdown"); - } - - } - - private class ReadWorker implements Runnable - { - - public void run() - { - Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); - - //System.out.println("ReadDebug:"+"Startup"); - for (; ;) - { - try - { - int nKeys = selector.select(SELECTOR_TIMEOUT); - - doAddNewReader(); - doUpdateTrafficMask(); - - if (nKeys > 0) - { - //System.out.println("ReadDebug:"+nKeys + " keys from selector"); - - processRead(selector.selectedKeys()); - } - else - { - //System.out.println("ReadDebug:"+"No keys from selector"); - } - - - doRemove(); - notifyReadIdleness(); - - if (selector.keys().isEmpty()) - { - - synchronized (readLock) - { - if (selector.keys().isEmpty() && newSessions.isEmpty()) - { - readWorker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - - break; - } - } - } - } - catch (Throwable t) - { - ExceptionMonitor.getInstance().exceptionCaught(t); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - //System.out.println("ReadDebug:"+"Shutdown"); - } - - } -} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java deleted file mode 100644 index 043d4800b6..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionConfigImpl.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IoConnectorConfig; -import org.apache.mina.common.support.BaseIoSessionConfig; - -import java.io.IOException; -import java.net.Socket; -import java.net.SocketException; - -/** - * An {@link IoConnectorConfig} for {@link SocketConnector}. - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ - */ -public class MultiThreadSocketSessionConfigImpl extends org.apache.mina.transport.socket.nio.SocketSessionConfigImpl -{ - private static boolean SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; - private static boolean SET_SEND_BUFFER_SIZE_AVAILABLE = false; - private static boolean GET_TRAFFIC_CLASS_AVAILABLE = false; - private static boolean SET_TRAFFIC_CLASS_AVAILABLE = false; - - private static boolean DEFAULT_REUSE_ADDRESS; - private static int DEFAULT_RECEIVE_BUFFER_SIZE; - private static int DEFAULT_SEND_BUFFER_SIZE; - private static int DEFAULT_TRAFFIC_CLASS; - private static boolean DEFAULT_KEEP_ALIVE; - private static boolean DEFAULT_OOB_INLINE; - private static int DEFAULT_SO_LINGER; - private static boolean DEFAULT_TCP_NO_DELAY; - - static - { - initialize(); - } - - private static void initialize() - { - Socket socket = null; - - socket = new Socket(); - - try - { - DEFAULT_REUSE_ADDRESS = socket.getReuseAddress(); - DEFAULT_RECEIVE_BUFFER_SIZE = socket.getReceiveBufferSize(); - DEFAULT_SEND_BUFFER_SIZE = socket.getSendBufferSize(); - DEFAULT_KEEP_ALIVE = socket.getKeepAlive(); - DEFAULT_OOB_INLINE = socket.getOOBInline(); - DEFAULT_SO_LINGER = socket.getSoLinger(); - DEFAULT_TCP_NO_DELAY = socket.getTcpNoDelay(); - - // Check if setReceiveBufferSize is supported. - try - { - socket.setReceiveBufferSize(DEFAULT_RECEIVE_BUFFER_SIZE); - SET_RECEIVE_BUFFER_SIZE_AVAILABLE = true; - } - catch( SocketException e ) - { - SET_RECEIVE_BUFFER_SIZE_AVAILABLE = false; - } - - // Check if setSendBufferSize is supported. - try - { - socket.setSendBufferSize(DEFAULT_SEND_BUFFER_SIZE); - SET_SEND_BUFFER_SIZE_AVAILABLE = true; - } - catch( SocketException e ) - { - SET_SEND_BUFFER_SIZE_AVAILABLE = false; - } - - // Check if getTrafficClass is supported. - try - { - DEFAULT_TRAFFIC_CLASS = socket.getTrafficClass(); - GET_TRAFFIC_CLASS_AVAILABLE = true; - } - catch( SocketException e ) - { - GET_TRAFFIC_CLASS_AVAILABLE = false; - DEFAULT_TRAFFIC_CLASS = 0; - } - } - catch( SocketException e ) - { - throw new ExceptionInInitializerError(e); - } - finally - { - if( socket != null ) - { - try - { - socket.close(); - } - catch( IOException e ) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - public static boolean isSetReceiveBufferSizeAvailable() { - return SET_RECEIVE_BUFFER_SIZE_AVAILABLE; - } - - public static boolean isSetSendBufferSizeAvailable() { - return SET_SEND_BUFFER_SIZE_AVAILABLE; - } - - public static boolean isGetTrafficClassAvailable() { - return GET_TRAFFIC_CLASS_AVAILABLE; - } - - public static boolean isSetTrafficClassAvailable() { - return SET_TRAFFIC_CLASS_AVAILABLE; - } - - private boolean reuseAddress = DEFAULT_REUSE_ADDRESS; - private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; - private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; - private int trafficClass = DEFAULT_TRAFFIC_CLASS; - private boolean keepAlive = DEFAULT_KEEP_ALIVE; - private boolean oobInline = DEFAULT_OOB_INLINE; - private int soLinger = DEFAULT_SO_LINGER; - private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; - - /** - * Creates a new instance. - */ - MultiThreadSocketSessionConfigImpl() - { - } - - public boolean isReuseAddress() - { - return reuseAddress; - } - - public void setReuseAddress( boolean reuseAddress ) - { - this.reuseAddress = reuseAddress; - } - - public int getReceiveBufferSize() - { - return receiveBufferSize; - } - - public void setReceiveBufferSize( int receiveBufferSize ) - { - this.receiveBufferSize = receiveBufferSize; - } - - public int getSendBufferSize() - { - return sendBufferSize; - } - - public void setSendBufferSize( int sendBufferSize ) - { - this.sendBufferSize = sendBufferSize; - } - - public int getTrafficClass() - { - return trafficClass; - } - - public void setTrafficClass( int trafficClass ) - { - this.trafficClass = trafficClass; - } - - public boolean isKeepAlive() - { - return keepAlive; - } - - public void setKeepAlive( boolean keepAlive ) - { - this.keepAlive = keepAlive; - } - - public boolean isOobInline() - { - return oobInline; - } - - public void setOobInline( boolean oobInline ) - { - this.oobInline = oobInline; - } - - public int getSoLinger() - { - return soLinger; - } - - public void setSoLinger( int soLinger ) - { - this.soLinger = soLinger; - } - - public boolean isTcpNoDelay() - { - return tcpNoDelay; - } - - public void setTcpNoDelay( boolean tcpNoDelay ) - { - this.tcpNoDelay = tcpNoDelay; - } - - -} diff --git a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java deleted file mode 100644 index be4a2d289d..0000000000 --- a/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketSessionImpl.java +++ /dev/null @@ -1,488 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import org.apache.mina.common.IoFilter.WriteRequest; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoService; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoSessionConfig; -import org.apache.mina.common.RuntimeIOException; -import org.apache.mina.common.TransportType; -import org.apache.mina.common.support.BaseIoSessionConfig; -import org.apache.mina.common.support.IoServiceListenerSupport; -import org.apache.mina.util.Queue; - -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * An {@link IoSession} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ - */ -class MultiThreadSocketSessionImpl extends SocketSessionImpl -{ - private final IoService manager; - private final IoServiceConfig serviceConfig; - private final SocketSessionConfig config = new SessionConfigImpl(); - private final MultiThreadSocketIoProcessor ioProcessor; - private final MultiThreadSocketFilterChain filterChain; - private final SocketChannel ch; - private final Queue writeRequestQueue; - private final IoHandler handler; - private final SocketAddress remoteAddress; - private final SocketAddress localAddress; - private final SocketAddress serviceAddress; - private final IoServiceListenerSupport serviceListeners; - private SelectionKey readKey, writeKey; - private int readBufferSize; - private CountDownLatch registeredReadyLatch = new CountDownLatch(2); - private AtomicBoolean created = new AtomicBoolean(false); - - /** - * Creates a new instance. - */ - MultiThreadSocketSessionImpl( IoService manager, - SocketIoProcessor ioProcessor, - IoServiceListenerSupport listeners, - IoServiceConfig serviceConfig, - SocketChannel ch, - IoHandler defaultHandler, - SocketAddress serviceAddress ) - { - super(manager, ioProcessor, listeners, serviceConfig, ch,defaultHandler,serviceAddress); - this.manager = manager; - this.serviceListeners = listeners; - this.ioProcessor = (MultiThreadSocketIoProcessor) ioProcessor; - this.filterChain = new MultiThreadSocketFilterChain(this); - this.ch = ch; - this.writeRequestQueue = new Queue(); - this.handler = defaultHandler; - this.remoteAddress = ch.socket().getRemoteSocketAddress(); - this.localAddress = ch.socket().getLocalSocketAddress(); - this.serviceAddress = serviceAddress; - this.serviceConfig = serviceConfig; - - // Apply the initial session settings - IoSessionConfig sessionConfig = serviceConfig.getSessionConfig(); - if( sessionConfig instanceof SocketSessionConfig ) - { - SocketSessionConfig cfg = ( SocketSessionConfig ) sessionConfig; - this.config.setKeepAlive( cfg.isKeepAlive() ); - this.config.setOobInline( cfg.isOobInline() ); - this.config.setReceiveBufferSize( cfg.getReceiveBufferSize() ); - this.readBufferSize = cfg.getReceiveBufferSize(); - this.config.setReuseAddress( cfg.isReuseAddress() ); - this.config.setSendBufferSize( cfg.getSendBufferSize() ); - this.config.setSoLinger( cfg.getSoLinger() ); - this.config.setTcpNoDelay( cfg.isTcpNoDelay() ); - - if( this.config.getTrafficClass() != cfg.getTrafficClass() ) - { - this.config.setTrafficClass( cfg.getTrafficClass() ); - } - } - } - - void awaitRegistration() throws InterruptedException - { - registeredReadyLatch.countDown(); - - registeredReadyLatch.await(); - } - - boolean created() throws InterruptedException - { - return created.get(); - } - - void doneCreation() - { - created.getAndSet(true); - } - - public IoService getService() - { - return manager; - } - - public IoServiceConfig getServiceConfig() - { - return serviceConfig; - } - - public IoSessionConfig getConfig() - { - return config; - } - - SocketIoProcessor getIoProcessor() - { - return ioProcessor; - } - - public IoFilterChain getFilterChain() - { - return filterChain; - } - - SocketChannel getChannel() - { - return ch; - } - - IoServiceListenerSupport getServiceListeners() - { - return serviceListeners; - } - - SelectionKey getSelectionKey() - { - return readKey; - } - - SelectionKey getReadSelectionKey() - { - return readKey; - } - - SelectionKey getWriteSelectionKey() - { - return writeKey; - } - - void setSelectionKey(SelectionKey key) - { - this.readKey = key; - } - - void setWriteSelectionKey(SelectionKey key) - { - this.writeKey = key; - } - - public IoHandler getHandler() - { - return handler; - } - - protected void close0() - { - filterChain.fireFilterClose( this ); - } - - Queue getWriteRequestQueue() - { - return writeRequestQueue; - } - - /** - @return int Number of write scheduled write requests - @deprecated - */ - public int getScheduledWriteMessages() - { - return getScheduledWriteRequests(); - } - - public int getScheduledWriteRequests() - { - synchronized( writeRequestQueue ) - { - return writeRequestQueue.size(); - } - } - - public int getScheduledWriteBytes() - { - synchronized( writeRequestQueue ) - { - return writeRequestQueue.byteSize(); - } - } - - protected void write0( WriteRequest writeRequest ) - { - filterChain.fireFilterWrite( this, writeRequest ); - } - - public TransportType getTransportType() - { - return TransportType.SOCKET; - } - - public SocketAddress getRemoteAddress() - { - //This is what I had previously -// return ch.socket().getRemoteSocketAddress(); - return remoteAddress; - } - - public SocketAddress getLocalAddress() - { - //This is what I had previously -// return ch.socket().getLocalSocketAddress(); - return localAddress; - } - - public SocketAddress getServiceAddress() - { - return serviceAddress; - } - - protected void updateTrafficMask() - { - this.ioProcessor.updateTrafficMask( this ); - } - - int getReadBufferSize() - { - return readBufferSize; - } - - private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig - { - public boolean isKeepAlive() - { - try - { - return ch.socket().getKeepAlive(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setKeepAlive( boolean on ) - { - try - { - ch.socket().setKeepAlive( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public boolean isOobInline() - { - try - { - return ch.socket().getOOBInline(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setOobInline( boolean on ) - { - try - { - ch.socket().setOOBInline( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public boolean isReuseAddress() - { - try - { - return ch.socket().getReuseAddress(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setReuseAddress( boolean on ) - { - try - { - ch.socket().setReuseAddress( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public int getSoLinger() - { - try - { - return ch.socket().getSoLinger(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setSoLinger( int linger ) - { - try - { - if( linger < 0 ) - { - ch.socket().setSoLinger( false, 0 ); - } - else - { - ch.socket().setSoLinger( true, linger ); - } - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public boolean isTcpNoDelay() - { - try - { - return ch.socket().getTcpNoDelay(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setTcpNoDelay( boolean on ) - { - try - { - ch.socket().setTcpNoDelay( on ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public int getTrafficClass() - { - if( SocketSessionConfigImpl.isGetTrafficClassAvailable() ) - { - try - { - return ch.socket().getTrafficClass(); - } - catch( SocketException e ) - { - // Throw an exception only when setTrafficClass is also available. - if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) - { - throw new RuntimeIOException( e ); - } - } - } - - return 0; - } - - public void setTrafficClass( int tc ) - { - if( SocketSessionConfigImpl.isSetTrafficClassAvailable() ) - { - try - { - ch.socket().setTrafficClass( tc ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - } - - public int getSendBufferSize() - { - try - { - return ch.socket().getSendBufferSize(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setSendBufferSize( int size ) - { - if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() ) - { - try - { - ch.socket().setSendBufferSize( size ); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - } - - public int getReceiveBufferSize() - { - try - { - return ch.socket().getReceiveBufferSize(); - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - - public void setReceiveBufferSize( int size ) - { - if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() ) - { - try - { - ch.socket().setReceiveBufferSize( size ); - MultiThreadSocketSessionImpl.this.readBufferSize = size; - } - catch( SocketException e ) - { - throw new RuntimeIOException( e ); - } - } - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java deleted file mode 100644 index 5423bbb68f..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid; - -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.mina.MinaHandler; - -import static org.apache.qpid.transport.util.Functions.str; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; - - -/** - * ToyBroker - * - * @author Rafael H. Schloming - */ - -class ToyBroker extends SessionDelegate -{ - - private ToyExchange exchange; - private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); - - public ToyBroker(ToyExchange exchange) - { - this.exchange = exchange; - } - - public void messageAcquire(Session context, MessageAcquire struct) - { - System.out.println("\n==================> messageAcquire " ); - context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); - } - - @Override public void queueDeclare(Session ssn, QueueDeclare qd) - { - exchange.createQueue(qd.getQueue()); - System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); - } - - @Override public void exchangeBind(Session ssn, ExchangeBind qb) - { - exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); - System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); - } - - @Override public void queueQuery(Session ssn, QueueQuery qq) - { - QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); - ssn.executionResult((int) qq.getId(), result); - } - - @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) - { - Consumer c = new Consumer(); - c._queueName = ms.getQueue(); - consumers.put(ms.getDestination(),c); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); - } - - @Override public void messageFlow(Session ssn,MessageFlow struct) - { - Consumer c = consumers.get(struct.getDestination()); - c._credit = struct.getValue(); - System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); - } - - @Override public void messageFlush(Session ssn,MessageFlush struct) - { - System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); - checkAndSendMessagesToConsumer(ssn,struct.getDestination()); - } - - @Override public void messageTransfer(Session ssn, MessageTransfer xfr) - { - String dest = xfr.getDestination(); - System.out.println("received transfer " + dest); - Header header = xfr.getHeader(); - DeliveryProperties props = header.get(DeliveryProperties.class); - if (props != null) - { - System.out.println("received headers routing_key " + props.getRoutingKey()); - } - - MessageProperties mp = header.get(MessageProperties.class); - System.out.println("MP: " + mp); - if (mp != null) - { - System.out.println(mp.getApplicationHeaders()); - } - - if (exchange.route(dest,props == null ? null : props.getRoutingKey(),xfr)) - { - System.out.println("queued " + xfr); - dispatchMessages(ssn); - } - else - { - - if (props == null || !props.getDiscardUnroutable()) - { - RangeSet ranges = new RangeSet(); - ranges.add(xfr.getId()); - ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, - "no such destination"); - } - } - ssn.processed(xfr); - } - - private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) - { - System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - m.getHeader(), m.getBody()); - } - - private void dispatchMessages(Session ssn) - { - for (String dest: consumers.keySet()) - { - checkAndSendMessagesToConsumer(ssn,dest); - } - } - - private void checkAndSendMessagesToConsumer(Session ssn,String dest) - { - Consumer c = consumers.get(dest); - LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName); - MessageTransfer m = queue.poll(); - while (m != null && c._credit>0) - { - transferMessageToPeer(ssn,dest,m); - c._credit--; - m = queue.poll(); - } - } - - // ugly, but who cares :) - // assumes unit is always no of messages, not bytes - // assumes it's credit mode and not window - private static class Consumer - { - long _credit; - String _queueName; - } - - private static final class ToyBrokerSession extends Session - { - - public ToyBrokerSession(Connection connection, Binary name, long expiry, ToyExchange exchange) - { - super(connection, new ToyBroker(exchange), name, expiry); - } - } - - public static final void main(String[] args) throws IOException - { - final ToyExchange exchange = new ToyExchange(); - ConnectionDelegate delegate = new ServerDelegate() - { - @Override - public void init(Connection conn, ProtocolHeader hdr) - { - conn.setSessionFactory(new Connection.SessionFactory() - { - public Session newSession(Connection conn, Binary name, long expiry) - { - return new ToyBrokerSession(conn, name, expiry, exchange); - } - }); - - super.init(conn, hdr); //To change body of overridden methods use File | Settings | File Templates. - } - - }; - - MinaHandler.accept("0.0.0.0", 5672, delegate); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java deleted file mode 100644 index 5b2db10613..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid; - -import java.nio.*; -import java.util.*; - -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.mina.MinaHandler; - - -/** - * ToyClient - * - * @author Rafael H. Schloming - */ - -class ToyClient implements SessionListener -{ - public void opened(Session ssn) {} - - public void resumed(Session ssn) {} - - public void exception(Session ssn, SessionException exc) - { - exc.printStackTrace(); - } - - public void message(Session ssn, MessageTransfer xfr) - { - System.out.println("msg: " + xfr); - } - - public void closed(Session ssn) {} - - public static final void main(String[] args) - { - Connection conn = new Connection(); - conn.connect("0.0.0.0", 5672, null, "guest", "guest", false); - Session ssn = conn.createSession(); - ssn.setSessionListener(new ToyClient()); - - ssn.queueDeclare("asdf", null, null); - ssn.sync(); - - Map<String,Object> nested = new LinkedHashMap<String,Object>(); - nested.put("list", Arrays.asList("one", "two", "three")); - Map<String,Object> map = new LinkedHashMap<String,Object>(); - - map.put("str", "this is a string"); - - map.put("+int", 3); - map.put("-int", -3); - map.put("maxint", Integer.MAX_VALUE); - map.put("minint", Integer.MIN_VALUE); - - map.put("+short", (short) 1); - map.put("-short", (short) -1); - map.put("maxshort", (short) Short.MAX_VALUE); - map.put("minshort", (short) Short.MIN_VALUE); - - map.put("float", (float) 3.3); - map.put("double", 4.9); - map.put("char", 'c'); - - map.put("table", nested); - map.put("list", Arrays.asList(1, 2, 3)); - map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); - - ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - new Header(new DeliveryProperties(), - new MessageProperties() - .setApplicationHeaders(map)), - "this is the data"); - - ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - null, - "this should be rejected"); - ssn.sync(); - - Future<QueueQueryResult> future = ssn.queueQuery("asdf"); - System.out.println(future.get().getQueue()); - ssn.sync(); - ssn.close(); - conn.close(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java deleted file mode 100644 index da6aed9629..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java +++ /dev/null @@ -1,154 +0,0 @@ -package org.apache.qpid; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.qpid.transport.MessageTransfer; - - -public class ToyExchange -{ - final static String DIRECT = "amq.direct"; - final static String TOPIC = "amq.topic"; - - private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); - private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); - private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>(); - - public void createQueue(String name) - { - queues.put(name, new LinkedBlockingQueue<MessageTransfer>()); - } - - public LinkedBlockingQueue<MessageTransfer> getQueue(String name) - { - return queues.get(name); - } - - public void bindQueue(String type,String binding,String queueName) - { - LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName); - binding = normalizeKey(binding); - if(DIRECT.equals(type)) - { - - if (directEx.containsKey(binding)) - { - List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding); - list.add(queue); - } - else - { - List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); - list.add(queue); - directEx.put(binding,list); - } - } - else - { - if (topicEx.containsKey(binding)) - { - List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding); - list.add(queue); - } - else - { - List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); - list.add(queue); - topicEx.put(binding,list); - } - } - } - - public boolean route(String dest, String routingKey, MessageTransfer msg) - { - List<LinkedBlockingQueue<MessageTransfer>> queues; - if(DIRECT.equals(dest)) - { - queues = directEx.get(routingKey); - } - else - { - queues = matchWildCard(routingKey); - } - if(queues != null && queues.size()>0) - { - System.out.println("Message stored in " + queues.size() + " queues"); - storeMessage(msg,queues); - return true; - } - else - { - System.out.println("Message unroutable " + msg); - return false; - } - } - - private String normalizeKey(String routingKey) - { - if(routingKey.indexOf(".*")>1) - { - return routingKey.substring(0,routingKey.indexOf(".*")); - } - else - { - return routingKey; - } - } - - private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey) - { - List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>(); - - for(String key: topicEx.keySet()) - { - Pattern p = Pattern.compile(key); - Matcher m = p.matcher(routingKey); - if (m.find()) - { - for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key)) - { - selected.add(queue); - } - } - } - - return selected; - } - - private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected) - { - for(LinkedBlockingQueue<MessageTransfer> queue : selected) - { - queue.offer(msg); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 0dd21238a7..5fa14cf103 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -86,35 +86,6 @@ public class ClientProperties public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message"; - /** - * ========================================================== - * Those properties are used when the io size should be bounded - * ========================================================== - */ - - /** - * When set to true the io layer throttle down producers and consumers - * when written or read messages are accumulating and exceeding a certain size. - * This is especially useful when a the producer rate is greater than the network - * speed. - * type: boolean - */ - public static final String PROTECTIO_PROP_NAME = "protectio"; - - //=== The following properties are only used when the previous one is true. - /** - * Max size of read messages that can be stored within the MINA layer - * type: int - */ - public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; - public static final String READ_BUFFER_LIMIT_DEFAULT = "262144"; - /** - * Max size of written messages that can be stored within the MINA layer - * type: int - */ - public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; - public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144"; - public static final String AMQP_VERSION = "qpid.amqp.version"; private static ClientProperties _instance = new ClientProperties(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java deleted file mode 100644 index ecc5f6d07c..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java +++ /dev/null @@ -1,130 +0,0 @@ -package org.apache.qpid.transport.network.io; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQMethodBodyFactory; -import org.apache.qpid.framing.BodyFactory; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentBodyFactory; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderBodyFactory; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.HeartbeatBodyFactory; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.transport.Receiver; - -public class InputHandler_0_9 implements Receiver<ByteBuffer> -{ - - private AMQVersionAwareProtocolSession _session; - private MethodRegistry _registry; - private BodyFactory bodyFactory; - private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; - - static - { - _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); - _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); - _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); - } - - public InputHandler_0_9(AMQVersionAwareProtocolSession session) - { - _session = session; - _registry = _session.getMethodRegistry(); - } - - public void closed() - { - // AS FIXME: implement - } - - public void exception(Throwable t) - { - // TODO: propogate exception to things - t.printStackTrace(); - } - - public void received(ByteBuffer buf) - { - org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf); - try - { - final byte type = in.get(); - if (type == AMQMethodBody.TYPE) - { - bodyFactory = new AMQMethodBodyFactory(_session); - } - else - { - bodyFactory = _bodiesSupported[type]; - } - - if (bodyFactory == null) - { - throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); - } - - final int channel = in.getUnsignedShort(); - final long bodySize = in.getUnsignedInt(); - - // bodySize can be zero - if ((channel < 0) || (bodySize < 0)) - { - throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize, null); - } - - AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); - - byte marker = in.get(); - if ((marker & 0xFF) != 0xCE) - { - throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type, null); - } - - try - { - frame.getBodyFrame().handle(frame.getChannel(), _session); - } - catch (AMQException e) - { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - catch (AMQFrameDecodingException e) - { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index bfdbb34978..f261111777 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -19,25 +19,16 @@ package org.apache.qpid.transport.network.io; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.Socket; -import java.net.SocketException; import java.nio.ByteBuffer; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.security.ssl.SSLReceiver; import org.apache.qpid.transport.network.security.ssl.SSLSender; import org.apache.qpid.transport.util.Logger; @@ -134,82 +125,6 @@ public final class IoTransport<E> implements IoContext return socket; } - public static final <E> E connect(String host, int port, - Binding<E,ByteBuffer> binding, - boolean ssl) - { - Socket socket = createSocket(host, port); - IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl); - return transport.endpoint; - } - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate, - boolean ssl) - { - return connect(host, port, ConnectionBinding.get(delegate),ssl); - } - - public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port, boolean ssl) - { - connect(host, port, new Binding_0_9(session),ssl); - } - - private static class Binding_0_9 - implements Binding<AMQVersionAwareProtocolSession,ByteBuffer> - { - - private AMQVersionAwareProtocolSession session; - - Binding_0_9(AMQVersionAwareProtocolSession session) - { - this.session = session; - } - - public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender) - { - session.setSender(sender); - return session; - } - - public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn) - { - return new InputHandler_0_9(ssn); - } - - } - - private static Socket createSocket(String host, int port) - { - try - { - InetAddress address = InetAddress.getByName(host); - Socket socket = new Socket(); - socket.setReuseAddress(true); - socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - - log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); - - socket.setSendBufferSize(writeBufferSize); - socket.setReceiveBufferSize(readBufferSize); - - log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); - - socket.connect(new InetSocketAddress(address, port)); - return socket; - } - catch (SocketException e) - { - throw new TransportException("Error connecting to broker", e); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker", e); - } - } - private SSLContext createSSLContext() throws Exception { String trustStorePath = System.getProperty("javax.net.ssl.trustStore"); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 0f2c0d0226..2206e0999e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -26,16 +26,11 @@ import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; @@ -66,16 +61,12 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; ProtocolEngine _protocolEngine; - private boolean _useNIO = false; private int _processors = 4; - private boolean _executorPool = false; private SSLContextFactory _sslFactory = null; private IoConnector _socketConnector; private IoAcceptor _acceptor; private IoSession _ioSession; private ProtocolEngineFactory _factory; - private boolean _protectIO; - private NetworkDriverConfiguration _config; private Throwable _lastException; private boolean _acceptingConnections = false; @@ -91,21 +82,9 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) + public MINANetworkDriver(int processors, ProtocolEngine protocolEngine, IoSession session) { - _useNIO = useNIO; _processors = processors; - _executorPool = executorPool; - _protectIO = protectIO; - } - - public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO, - ProtocolEngine protocolEngine, IoSession session) - { - _useNIO = useNIO; - _processors = processors; - _executorPool = executorPool; - _protectIO = protectIO; _protocolEngine = protocolEngine; _ioSession = session; _ioSession.setAttachment(_protocolEngine); @@ -132,17 +111,8 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { _factory = factory; - _config = config; - if (_useNIO) - { - _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors, - new NewThreadExecutor()); - } - else - { - _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); - } + _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)")); @@ -207,15 +177,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _sslFactory = sslFactory; } - if (_useNIO) - { - _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); - } - else - { - _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking - // connector - } + _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); String s = ""; @@ -351,39 +313,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { // Configure the session with SSL if necessary SessionUtil.initialize(protocolSession); - if (_executorPool) + if (_sslFactory != null) { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } - } - else - { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } - } - // Do we want to have read/write buffer limits? - if (_protectIO) - { - //Add IO Protection Filters - IoFilterChain chain = protocolSession.getFilterChain(); - - protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); - writefilter.attach(chain); - - protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); } if (_ioSession == null) @@ -395,7 +328,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { // Set up the protocol engine ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); - MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); + MINANetworkDriver newDriver = new MINANetworkDriver(_processors, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java deleted file mode 100644 index b89eed48b0..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.apache.mina.common.*; - -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.executor.ExecutorFilter; - -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.ConnectionBinding; - -import org.apache.qpid.transport.util.Logger; - -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; - -import static org.apache.qpid.transport.util.Functions.*; - -/** - * MinaHandler - * - * @author Rafael H. Schloming - */ -//RA making this public until we sort out the package issues -public class MinaHandler<E> implements IoHandler -{ - /** Default buffer size for pending messages reads */ - private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; - /** Default buffer size for pending messages writes */ - private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; - private static final int MAX_RCVBUF = 64*1024; - - private static final Logger log = Logger.get(MinaHandler.class); - - static - { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - } - - private final Binding<E,java.nio.ByteBuffer> binding; - - private MinaHandler(Binding<E,java.nio.ByteBuffer> binding) - { - this.binding = binding; - } - - public void messageReceived(IoSession ssn, Object obj) - { - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - ByteBuffer buf = (ByteBuffer) obj; - try - { - attachment.receiver.received(buf.buf()); - } - catch (Throwable t) - { - log.error(t, "exception handling buffer %s", str(buf.buf())); - throw new RuntimeException(t); - } - } - - public void messageSent(IoSession ssn, Object obj) - { - // do nothing - } - - public void exceptionCaught(IoSession ssn, Throwable e) - { - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - attachment.receiver.exception(e); - } - - /** - * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the - * session, which filters the events handled by this handler. The filter chain consists of, handing off events - * to an optional protectio - * - * @param session The MINA session. - * @throws Exception Any underlying exceptions are allowed to fall through to MINA. - */ - public void sessionCreated(IoSession session) throws Exception - { - log.debug("Protocol session created for session " + System.identityHashCode(session)); - - if (Boolean.getBoolean("protectio")) - { - try - { - //Add IO Protection Filters - IoFilterChain chain = session.getFilterChain(); - - session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize( - Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize( - Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); - writefilter.attach(chain); - session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - - log.info("Using IO Read/Write Filter Protection"); - } - catch (Exception e) - { - log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); - } - } - } - - public void sessionOpened(final IoSession ssn) - { - log.debug("opened: %s", this); - E endpoint = binding.endpoint(new MinaSender(ssn)); - Attachment<E> attachment = - new Attachment<E>(endpoint, binding.receiver(endpoint)); - - // We need to synchronize and notify here because the MINA - // connect future returns the session prior to the attachment - // being set. This is arguably a bug in MINA. - synchronized (ssn) - { - ssn.setAttachment(attachment); - ssn.notifyAll(); - } - } - - public void sessionClosed(IoSession ssn) - { - log.debug("closed: %s", ssn); - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - attachment.receiver.closed(); - ssn.setAttachment(null); - } - - public void sessionIdle(IoSession ssn, IdleStatus status) - { - // do nothing - } - - private static class Attachment<E> - { - - E endpoint; - Receiver<java.nio.ByteBuffer> receiver; - - Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver) - { - this.endpoint = endpoint; - this.receiver = receiver; - } - } - - public static final void accept(String host, int port, - Binding<?,java.nio.ByteBuffer> binding) - throws IOException - { - accept(new InetSocketAddress(host, port), binding); - } - - public static final <E> void accept(SocketAddress address, - Binding<E,java.nio.ByteBuffer> binding) - throws IOException - { - IoAcceptor acceptor = new SocketAcceptor(); - acceptor.bind(address, new MinaHandler<E>(binding)); - } - - public static final <E> E connect(String host, int port, - Binding<E,java.nio.ByteBuffer> binding) - { - return connect(new InetSocketAddress(host, port), binding); - } - - public static final <E> E connect(SocketAddress address, - Binding<E,java.nio.ByteBuffer> binding) - { - MinaHandler<E> handler = new MinaHandler<E>(binding); - SocketConnector connector = new SocketConnector(); - IoServiceConfig acceptorConfig = connector.getDefaultConfig(); - acceptorConfig.setThreadModel(ThreadModel.MANUAL); - SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig(); - scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize"); - if (sendBufferSize != null && sendBufferSize > 0) - { - scfg.setSendBufferSize(sendBufferSize); - } - Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize"); - if (receiveBufferSize != null && receiveBufferSize > 0) - { - scfg.setReceiveBufferSize(receiveBufferSize); - } - else if (scfg.getReceiveBufferSize() > MAX_RCVBUF) - { - scfg.setReceiveBufferSize(MAX_RCVBUF); - } - connector.setWorkerTimeout(0); - ConnectFuture cf = connector.connect(address, handler); - cf.join(); - IoSession ssn = cf.getSession(); - - // We need to synchronize and wait here because the MINA - // connect future returns the session prior to the attachment - // being set. This is arguably a bug in MINA. - synchronized (ssn) - { - while (ssn.getAttachment() == null) - { - try - { - ssn.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - return attachment.endpoint; - } - - public static final void accept(String host, int port, - ConnectionDelegate delegate) - throws IOException - { - accept(host, port, ConnectionBinding.get(delegate)); - } - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate) - { - return connect(host, port, ConnectionBinding.get(delegate)); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java deleted file mode 100644 index 22b9c5e784..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.TransportException; - - -/** - * MinaSender - */ - -public class MinaSender implements Sender<java.nio.ByteBuffer> -{ - private static final int TIMEOUT = 2 * 60 * 1000; - - private final IoSession session; - private WriteFuture lastWrite = null; - - public MinaSender(IoSession session) - { - this.session = session; - } - - public void send(java.nio.ByteBuffer buf) - { - if (session.isClosing()) - { - throw new TransportException("attempted to write to a closed socket"); - } - - synchronized (this) - { - lastWrite = session.write(ByteBuffer.wrap(buf)); - } - } - - public void flush() - { - // pass - } - - public synchronized void close() - { - // MINA will sometimes throw away in-progress writes when you - // ask it to close - synchronized (this) - { - if (lastWrite != null) - { - lastWrite.join(); - } - } - CloseFuture closed = session.close(); - closed.join(); - } - - public void setIdleTimeout(int i) - { - //noop - } - - public long getIdleTimeout() - { - return 0; - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java deleted file mode 100644 index 84e66c25bd..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java +++ /dev/null @@ -1,135 +0,0 @@ -package org.apache.qpid.transport.network.nio; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; - -public class NioHandler implements Runnable -{ - private Receiver<ByteBuffer> _receiver; - private SocketChannel _ch; - private ByteBuffer _readBuf; - private static Map<Long,NioSender> _handlers = new ConcurrentHashMap<Long,NioSender>(); - - private NioHandler(){} - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate) - { - NioHandler handler = new NioHandler(); - return handler.connectInternal(host,port,delegate); - } - - private Connection connectInternal(String host, int port, - ConnectionDelegate delegate) - { - try - { - SocketAddress address = new InetSocketAddress(host,port); - _ch = SocketChannel.open(); - _ch.socket().setReuseAddress(true); - _ch.configureBlocking(true); - _ch.socket().setTcpNoDelay(true); - if (address != null) - { - _ch.socket().connect(address); - } - while (_ch.isConnectionPending()) - { - - } - - } - catch (SocketException e) - { - - e.printStackTrace(); - } - catch (IOException e) - { - e.printStackTrace(); - } - - NioSender sender = new NioSender(_ch); - Connection con = new Connection(); - con.setSender(new Disassembler(sender, 64*1024 - 1)); - con.setConnectionDelegate(delegate); - - _handlers.put(con.getConnectionId(),sender); - - _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR); - - Thread t = new Thread(this); - t.start(); - - return con; - } - - public void run() - { - _readBuf = ByteBuffer.allocate(512); - long read = 0; - while(_ch.isConnected() && _ch.isOpen()) - { - try - { - read = _ch.read(_readBuf); - if (read > 0) - { - _readBuf.flip(); - ByteBuffer b = ByteBuffer.allocate(_readBuf.remaining()); - b.put(_readBuf); - b.flip(); - _readBuf.clear(); - _receiver.received(b); - } - } - catch(Exception e) - { - e.printStackTrace(); - } - } - - //throw new EOFException("The underlying socket/channel has closed"); - } - - public static void startBatchingFrames(int connectionId) - { - NioSender sender = _handlers.get(connectionId); - sender.setStartBatching(); - } - - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java deleted file mode 100644 index 2fa875f279..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java +++ /dev/null @@ -1,126 +0,0 @@ -package org.apache.qpid.transport.network.nio; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -import org.apache.qpid.transport.Sender; - -public class NioSender implements Sender<java.nio.ByteBuffer> -{ - private final Object lock = new Object(); - private SocketChannel _ch; - private boolean _batch = false; - private ByteBuffer _batcher; - - public NioSender(SocketChannel ch) - { - this._ch = ch; - } - - public void send(java.nio.ByteBuffer buf) - { - if (_batch) - { - //System.out.println(_batcher.position() + " , " + buf.remaining() + " , " + buf.position() + ","+_batcher.capacity()); - if (_batcher.position() + buf.remaining() >= _batcher.capacity()) - { - _batcher.flip(); - write(_batcher); - _batcher.clear(); - if (buf.remaining() > _batcher.capacity()) - { - write(buf); - } - else - { - _batcher.put(buf); - } - } - else - { - _batcher.put(buf); - } - } - else - { - write(buf); - } - } - - public void flush() - { - // pass - } - - private void write(java.nio.ByteBuffer buf) - { - synchronized (lock) - { - if( _ch.isConnected() && _ch.isOpen()) - { - try - { - _ch.write(buf); - } - catch(Exception e) - { - e.fillInStackTrace(); - } - } - else - { - throw new RuntimeException("Trying to write on a closed socket"); - } - - } - } - - public void setStartBatching() - { - _batch = true; - _batcher = ByteBuffer.allocate(1024); - } - - public void close() - { - // MINA will sometimes throw away in-progress writes when you - // ask it to close - synchronized (lock) - { - try - { - _ch.close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - } - - public void setIdleTimeout(int i) - { - //noop - } -} diff --git a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java deleted file mode 100644 index b93dc46741..0000000000 --- a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.SocketIOTest; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.CountDownLatch; - -public class IOWriterClient implements Runnable -{ - private static final Logger _logger = LoggerFactory.getLogger(IOWriterClient.class); - - public static int DEFAULT_TEST_SIZE = 2; - - private IoSession _session; - - private long _startTime; - - private long[] _chunkTimes; - - public int _chunkCount = 200000; - - private int _chunkSize = 1024; - - private CountDownLatch _notifier; - - private int _maximumWriteQueueLength; - - static public int _PORT = IOWriterServer._PORT; - - public void run() - { - _logger.info("Starting to send " + _chunkCount + " buffers of " + _chunkSize + "B"); - _startTime = System.currentTimeMillis(); - _notifier = new CountDownLatch(1); - - for (int i = 0; i < _chunkCount; i++) - { - ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); - byte check = (byte) (i % 128); - buf.put(check); - buf.fill((byte) 88, buf.remaining()); - buf.flip(); - - _session.write(buf); - } - - long _sentall = System.currentTimeMillis(); - long _receivedall = _sentall; - try - { - _logger.info("All buffers sent; waiting for receipt from server"); - _notifier.await(); - _receivedall = System.currentTimeMillis(); - } - catch (InterruptedException e) - { - //Ignore - } - _logger.info("Completed"); - _logger.info("Total time waiting for server after last write: " + (_receivedall - _sentall)); - - long totalTime = System.currentTimeMillis() - _startTime; - - _logger.info("Total time: " + totalTime); - _logger.info("MB per second: " + (int) ((1.0 * _chunkSize * _chunkCount) / totalTime)); - long lastChunkTime = _startTime; - double average = 0; - for (int i = 0; i < _chunkTimes.length; i++) - { - if (i == 0) - { - average = _chunkTimes[i] - _startTime; - } - else - { - long delta = _chunkTimes[i] - lastChunkTime; - if (delta != 0) - { - average = (average + delta) / 2; - } - } - lastChunkTime = _chunkTimes[i]; - } - _logger.info("Average chunk time: " + average + "ms"); - _logger.info("Maximum WriteRequestQueue size: " + _maximumWriteQueueLength); - - CloseFuture cf = _session.close(); - _logger.info("Closing session"); - cf.join(); - } - - private class WriterHandler extends IoHandlerAdapter - { - private int _chunksReceived = 0; - - private int _partialBytesRead = 0; - - private byte _partialCheckNumber; - - private int _totalBytesReceived = 0; - - private int _receivedCount = 0; - private int _sentCount = 0; - private static final String DEFAULT_READ_BUFFER = "262144"; - private static final String DEFAULT_WRITE_BUFFER = "262144"; - - public void sessionCreated(IoSession session) throws Exception - { - IoFilterChain chain = session.getFilterChain(); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); - - writefilter.attach(chain); - } - - public void messageSent(IoSession session, Object message) throws Exception - { - _maximumWriteQueueLength = Math.max(session.getScheduledWriteRequests(), _maximumWriteQueueLength); - - if (_logger.isDebugEnabled()) - { - ++_sentCount; - if (_sentCount % 1000 == 0) - { - _logger.debug("Sent count " + _sentCount + ":WQueue" + session.getScheduledWriteRequests()); - - } - } - } - - public void messageReceived(IoSession session, Object message) throws Exception - { - if (_logger.isDebugEnabled()) - { - ++_receivedCount; - - if (_receivedCount % 1000 == 0) - { - _logger.debug("Receieved count " + _receivedCount); - } - } - - ByteBuffer result = (ByteBuffer) message; - _totalBytesReceived += result.remaining(); - int size = result.remaining(); - long now = System.currentTimeMillis(); - if (_partialBytesRead > 0) - { - int offset = _chunkSize - _partialBytesRead; - if (size >= offset) - { - _chunkTimes[_chunksReceived++] = now; - result.position(offset); - } - else - { - // have not read even one chunk, including the previous partial bytes - _partialBytesRead += size; - return; - } - } - - - int chunkCount = result.remaining() / _chunkSize; - - for (int i = 0; i < chunkCount; i++) - { - _chunkTimes[_chunksReceived++] = now; - byte check = result.get(); - _logger.debug("Check number " + check + " read"); - if (check != (byte) ((_chunksReceived - 1) % 128)) - { - _logger.error("Check number " + check + " read when expected " + (_chunksReceived % 128)); - } - _logger.debug("Chunk times recorded"); - - try - { - result.skip(_chunkSize - 1); - } - catch (IllegalArgumentException e) - { - _logger.error("Position was: " + result.position()); - _logger.error("Tried to skip to: " + (_chunkSize * i)); - _logger.error("limit was; " + result.limit()); - } - } - _logger.debug("Chunks received now " + _chunksReceived); - _logger.debug("Bytes received: " + _totalBytesReceived); - _partialBytesRead = result.remaining(); - - if (_partialBytesRead > 0) - { - _partialCheckNumber = result.get(); - } - - - if (_chunksReceived >= _chunkCount) - { - _notifier.countDown(); - } - - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - _logger.error("Error: " + cause, cause); - } - } - - public void startWriter() throws IOException, InterruptedException - { - - _maximumWriteQueueLength = 0; - - IoConnector ioConnector = null; - - if (Boolean.getBoolean("multinio")) - { - _logger.warn("Using MultiThread NIO"); - ioConnector = new org.apache.mina.transport.socket.nio.MultiThreadSocketConnector(); - } - else - { - _logger.warn("Using MINA NIO"); - ioConnector = new org.apache.mina.transport.socket.nio.SocketConnector(); - } - - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getDefaultConfig().getSessionConfig(); - scfg.setTcpNoDelay(true); - scfg.setSendBufferSize(32768); - scfg.setReceiveBufferSize(32768); - - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - - - final InetSocketAddress address = new InetSocketAddress("localhost", _PORT); - _logger.info("Attempting connection to " + address); - - //Old mina style -// ioConnector.setHandler(new WriterHandler()); -// ConnectFuture future = ioConnector.connect(address); - ConnectFuture future = ioConnector.connect(address, new WriterHandler()); - // wait for connection to complete - future.join(); - _logger.info("Connection completed"); - // we call getSession which throws an IOException if there has been an error connecting - _session = future.getSession(); - - _chunkTimes = new long[_chunkCount]; - Thread t = new Thread(this); - t.start(); - t.join(); - _logger.info("Test Complete"); - } - - - public void test1k() throws IOException, InterruptedException - { - _logger.info("Starting 1k test"); - _chunkSize = 1024; - startWriter(); - } - - - public void test2k() throws IOException, InterruptedException - { - _logger.info("Starting 2k test"); - _chunkSize = 2048; - startWriter(); - } - - - public void test4k() throws IOException, InterruptedException - { - _logger.info("Starting 4k test"); - _chunkSize = 4096; - startWriter(); - } - - - public void test8k() throws IOException, InterruptedException - { - _logger.info("Starting 8k test"); - _chunkSize = 8192; - startWriter(); - } - - - public void test16k() throws IOException, InterruptedException - { - _logger.info("Starting 16k test"); - _chunkSize = 16384; - startWriter(); - } - - - public void test32k() throws IOException, InterruptedException - { - _logger.info("Starting 32k test"); - _chunkSize = 32768; - startWriter(); - } - - - public static int getIntArg(String[] args, int index, int defaultValue) - { - if (args.length > index) - { - try - { - return Integer.parseInt(args[index]); - } - catch (NumberFormatException e) - { - //Do nothing - } - } - return defaultValue; - } - - public static void main(String[] args) throws IOException, InterruptedException - { - _PORT = getIntArg(args, 0, _PORT); - - int test = getIntArg(args, 1, DEFAULT_TEST_SIZE); - - IOWriterClient w = new IOWriterClient(); - w._chunkCount = getIntArg(args, 2, w._chunkCount); - switch (test) - { - case 0: - w.test1k(); - w.test2k(); - w.test4k(); - w.test8k(); - w.test16k(); - w.test32k(); - break; - case 1: - w.test1k(); - break; - case 2: - w.test2k(); - break; - case 4: - w.test4k(); - break; - case 8: - w.test8k(); - break; - case 16: - w.test16k(); - break; - case 32: - w.test32k(); - break; - } - } -} diff --git a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java b/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java deleted file mode 100644 index 423e98c67b..0000000000 --- a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.SocketIOTest; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; - -/** Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again. */ -public class IOWriterServer -{ - private static final Logger _logger = LoggerFactory.getLogger(IOWriterServer.class); - - static public int _PORT = 9999; - - private static final String DEFAULT_READ_BUFFER = "262144"; - private static final String DEFAULT_WRITE_BUFFER = "262144"; - - - private static class TestHandler extends IoHandlerAdapter - { - private int _sentCount = 0; - - private int _bytesSent = 0; - - private int _receivedCount = 0; - - public void sessionCreated(IoSession ioSession) throws java.lang.Exception - { - IoFilterChain chain = ioSession.getFilterChain(); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); - - writefilter.attach(chain); - - } - - public void messageReceived(IoSession session, Object message) throws Exception - { - ((ByteBuffer) message).acquire(); - session.write(message); - - if (_logger.isDebugEnabled()) - { - _bytesSent += ((ByteBuffer) message).remaining(); - - _sentCount++; - - if (_sentCount % 1000 == 0) - { - _logger.debug("Bytes sent: " + _bytesSent); - } - } - } - - public void messageSent(IoSession session, Object message) throws Exception - { - if (_logger.isDebugEnabled()) - { - ++_receivedCount; - - if (_receivedCount % 1000 == 0) - { - _logger.debug("Receieved count " + _receivedCount); - } - } - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - _logger.error("Error: " + cause, cause); - } - } - - public void startAcceptor() throws IOException - { - IoAcceptor acceptor; - if (Boolean.getBoolean("multinio")) - { - _logger.warn("Using MultiThread NIO"); - acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(); - } - else - { - _logger.warn("Using MINA NIO"); - acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(); - } - - - SocketSessionConfig sc = (SocketSessionConfig) acceptor.getDefaultConfig().getSessionConfig(); - sc.setTcpNoDelay(true); - sc.setSendBufferSize(32768); - sc.setReceiveBufferSize(32768); - - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - - //The old mina style -// acceptor.setLocalAddress(new InetSocketAddress(_PORT)); -// acceptor.setHandler(new TestHandler()); -// acceptor.bind(); - acceptor.bind(new InetSocketAddress(_PORT), new TestHandler()); - - _logger.info("Bound on port " + _PORT + ":" + _logger.isDebugEnabled()); - _logger.debug("debug on"); - } - - public static void main(String[] args) throws IOException - { - - if (args.length > 0) - { - try - { - _PORT = Integer.parseInt(args[0]); - } - catch (NumberFormatException e) - { - //IGNORE so use default port 9999; - } - } - - IOWriterServer a = new IOWriterServer(); - a.startAcceptor(); - } -} diff --git a/qpid/java/systests/etc/config-systests-firewall-2.xml b/qpid/java/systests/etc/config-systests-firewall-2.xml index 4c1bf9a800..2eedd65d54 100644 --- a/qpid/java/systests/etc/config-systests-firewall-2.xml +++ b/qpid/java/systests/etc/config-systests-firewall-2.xml @@ -35,17 +35,10 @@ <keystorePath>/path/to/keystore.ks</keystorePath> <keystorePassword>keystorepass</keystorePassword> </ssl> - <qpidnio>false</qpidnio> - <protectio> - <enabled>false</enabled> - <readBufferLimitSize>262144</readBufferLimitSize> - <writeBufferLimitSize>262144</writeBufferLimitSize> - </protectio> - <transport>nio</transport> <port>5672</port> <sslport>8672</sslport> - <socketReceiveBuffer>32768</socketReceiveBuffer> - <socketSendBuffer>32768</socketSendBuffer> + <socketReceiveBuffer>262144</socketReceiveBuffer> + <socketSendBuffer>262144</socketSendBuffer> </connector> <management> <enabled>false</enabled> diff --git a/qpid/java/systests/etc/config-systests-firewall-3.xml b/qpid/java/systests/etc/config-systests-firewall-3.xml index 19aaec9e54..fc7d9a4c76 100644 --- a/qpid/java/systests/etc/config-systests-firewall-3.xml +++ b/qpid/java/systests/etc/config-systests-firewall-3.xml @@ -35,17 +35,10 @@ <keystorePath>/path/to/keystore.ks</keystorePath> <keystorePassword>keystorepass</keystorePassword> </ssl> - <qpidnio>false</qpidnio> - <protectio> - <enabled>false</enabled> - <readBufferLimitSize>262144</readBufferLimitSize> - <writeBufferLimitSize>262144</writeBufferLimitSize> - </protectio> - <transport>nio</transport> <port>5672</port> <sslport>8672</sslport> - <socketReceiveBuffer>32768</socketReceiveBuffer> - <socketSendBuffer>32768</socketSendBuffer> + <socketReceiveBuffer>262144</socketReceiveBuffer> + <socketSendBuffer>262144</socketSendBuffer> </connector> <management> <enabled>false</enabled> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java index d4c550bc08..6d379e14d8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java @@ -61,21 +61,6 @@ public class ServerConfigurationFileTest extends QpidBrokerTestCase _serverConfig.getConfig().getProperty(property)); } - public void testProtectIOEnabled() throws ConfigurationException - { - validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED); - } - - public void testProtectIOReadBufferLimitSize() throws ConfigurationException - { - validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE); - } - - public void testProtectIOWriteBufferLimitSize() throws ConfigurationException - { - validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE); - } - public void testStatusUpdates() throws ConfigurationException { validatePropertyDefinedInFile(ServerConfiguration.STATUS_UPDATES); |