diff options
Diffstat (limited to 'qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java')
-rw-r--r-- | qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java | 396 |
1 files changed, 396 insertions, 0 deletions
diff --git a/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java new file mode 100644 index 0000000000..b93dc46741 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java @@ -0,0 +1,396 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.SocketIOTest; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; + +public class IOWriterClient implements Runnable +{ + private static final Logger _logger = LoggerFactory.getLogger(IOWriterClient.class); + + public static int DEFAULT_TEST_SIZE = 2; + + private IoSession _session; + + private long _startTime; + + private long[] _chunkTimes; + + public int _chunkCount = 200000; + + private int _chunkSize = 1024; + + private CountDownLatch _notifier; + + private int _maximumWriteQueueLength; + + static public int _PORT = IOWriterServer._PORT; + + public void run() + { + _logger.info("Starting to send " + _chunkCount + " buffers of " + _chunkSize + "B"); + _startTime = System.currentTimeMillis(); + _notifier = new CountDownLatch(1); + + for (int i = 0; i < _chunkCount; i++) + { + ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); + byte check = (byte) (i % 128); + buf.put(check); + buf.fill((byte) 88, buf.remaining()); + buf.flip(); + + _session.write(buf); + } + + long _sentall = System.currentTimeMillis(); + long _receivedall = _sentall; + try + { + _logger.info("All buffers sent; waiting for receipt from server"); + _notifier.await(); + _receivedall = System.currentTimeMillis(); + } + catch (InterruptedException e) + { + //Ignore + } + _logger.info("Completed"); + _logger.info("Total time waiting for server after last write: " + (_receivedall - _sentall)); + + long totalTime = System.currentTimeMillis() - _startTime; + + _logger.info("Total time: " + totalTime); + _logger.info("MB per second: " + (int) ((1.0 * _chunkSize * _chunkCount) / totalTime)); + long lastChunkTime = _startTime; + double average = 0; + for (int i = 0; i < _chunkTimes.length; i++) + { + if (i == 0) + { + average = _chunkTimes[i] - _startTime; + } + else + { + long delta = _chunkTimes[i] - lastChunkTime; + if (delta != 0) + { + average = (average + delta) / 2; + } + } + lastChunkTime = _chunkTimes[i]; + } + _logger.info("Average chunk time: " + average + "ms"); + _logger.info("Maximum WriteRequestQueue size: " + _maximumWriteQueueLength); + + CloseFuture cf = _session.close(); + _logger.info("Closing session"); + cf.join(); + } + + private class WriterHandler extends IoHandlerAdapter + { + private int _chunksReceived = 0; + + private int _partialBytesRead = 0; + + private byte _partialCheckNumber; + + private int _totalBytesReceived = 0; + + private int _receivedCount = 0; + private int _sentCount = 0; + private static final String DEFAULT_READ_BUFFER = "262144"; + private static final String DEFAULT_WRITE_BUFFER = "262144"; + + public void sessionCreated(IoSession session) throws Exception + { + IoFilterChain chain = session.getFilterChain(); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); + + writefilter.attach(chain); + } + + public void messageSent(IoSession session, Object message) throws Exception + { + _maximumWriteQueueLength = Math.max(session.getScheduledWriteRequests(), _maximumWriteQueueLength); + + if (_logger.isDebugEnabled()) + { + ++_sentCount; + if (_sentCount % 1000 == 0) + { + _logger.debug("Sent count " + _sentCount + ":WQueue" + session.getScheduledWriteRequests()); + + } + } + } + + public void messageReceived(IoSession session, Object message) throws Exception + { + if (_logger.isDebugEnabled()) + { + ++_receivedCount; + + if (_receivedCount % 1000 == 0) + { + _logger.debug("Receieved count " + _receivedCount); + } + } + + ByteBuffer result = (ByteBuffer) message; + _totalBytesReceived += result.remaining(); + int size = result.remaining(); + long now = System.currentTimeMillis(); + if (_partialBytesRead > 0) + { + int offset = _chunkSize - _partialBytesRead; + if (size >= offset) + { + _chunkTimes[_chunksReceived++] = now; + result.position(offset); + } + else + { + // have not read even one chunk, including the previous partial bytes + _partialBytesRead += size; + return; + } + } + + + int chunkCount = result.remaining() / _chunkSize; + + for (int i = 0; i < chunkCount; i++) + { + _chunkTimes[_chunksReceived++] = now; + byte check = result.get(); + _logger.debug("Check number " + check + " read"); + if (check != (byte) ((_chunksReceived - 1) % 128)) + { + _logger.error("Check number " + check + " read when expected " + (_chunksReceived % 128)); + } + _logger.debug("Chunk times recorded"); + + try + { + result.skip(_chunkSize - 1); + } + catch (IllegalArgumentException e) + { + _logger.error("Position was: " + result.position()); + _logger.error("Tried to skip to: " + (_chunkSize * i)); + _logger.error("limit was; " + result.limit()); + } + } + _logger.debug("Chunks received now " + _chunksReceived); + _logger.debug("Bytes received: " + _totalBytesReceived); + _partialBytesRead = result.remaining(); + + if (_partialBytesRead > 0) + { + _partialCheckNumber = result.get(); + } + + + if (_chunksReceived >= _chunkCount) + { + _notifier.countDown(); + } + + } + + public void exceptionCaught(IoSession session, Throwable cause) throws Exception + { + _logger.error("Error: " + cause, cause); + } + } + + public void startWriter() throws IOException, InterruptedException + { + + _maximumWriteQueueLength = 0; + + IoConnector ioConnector = null; + + if (Boolean.getBoolean("multinio")) + { + _logger.warn("Using MultiThread NIO"); + ioConnector = new org.apache.mina.transport.socket.nio.MultiThreadSocketConnector(); + } + else + { + _logger.warn("Using MINA NIO"); + ioConnector = new org.apache.mina.transport.socket.nio.SocketConnector(); + } + + SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getDefaultConfig().getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(32768); + scfg.setReceiveBufferSize(32768); + + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + + + final InetSocketAddress address = new InetSocketAddress("localhost", _PORT); + _logger.info("Attempting connection to " + address); + + //Old mina style +// ioConnector.setHandler(new WriterHandler()); +// ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, new WriterHandler()); + // wait for connection to complete + future.join(); + _logger.info("Connection completed"); + // we call getSession which throws an IOException if there has been an error connecting + _session = future.getSession(); + + _chunkTimes = new long[_chunkCount]; + Thread t = new Thread(this); + t.start(); + t.join(); + _logger.info("Test Complete"); + } + + + public void test1k() throws IOException, InterruptedException + { + _logger.info("Starting 1k test"); + _chunkSize = 1024; + startWriter(); + } + + + public void test2k() throws IOException, InterruptedException + { + _logger.info("Starting 2k test"); + _chunkSize = 2048; + startWriter(); + } + + + public void test4k() throws IOException, InterruptedException + { + _logger.info("Starting 4k test"); + _chunkSize = 4096; + startWriter(); + } + + + public void test8k() throws IOException, InterruptedException + { + _logger.info("Starting 8k test"); + _chunkSize = 8192; + startWriter(); + } + + + public void test16k() throws IOException, InterruptedException + { + _logger.info("Starting 16k test"); + _chunkSize = 16384; + startWriter(); + } + + + public void test32k() throws IOException, InterruptedException + { + _logger.info("Starting 32k test"); + _chunkSize = 32768; + startWriter(); + } + + + public static int getIntArg(String[] args, int index, int defaultValue) + { + if (args.length > index) + { + try + { + return Integer.parseInt(args[index]); + } + catch (NumberFormatException e) + { + //Do nothing + } + } + return defaultValue; + } + + public static void main(String[] args) throws IOException, InterruptedException + { + _PORT = getIntArg(args, 0, _PORT); + + int test = getIntArg(args, 1, DEFAULT_TEST_SIZE); + + IOWriterClient w = new IOWriterClient(); + w._chunkCount = getIntArg(args, 2, w._chunkCount); + switch (test) + { + case 0: + w.test1k(); + w.test2k(); + w.test4k(); + w.test8k(); + w.test16k(); + w.test32k(); + break; + case 1: + w.test1k(); + break; + case 2: + w.test2k(); + break; + case 4: + w.test4k(); + break; + case 8: + w.test8k(); + break; + case 16: + w.test16k(); + break; + case 32: + w.test32k(); + break; + } + } +} |