diff options
Diffstat (limited to 'trunk/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java')
-rw-r--r-- | trunk/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java | 396 |
1 files changed, 0 insertions, 396 deletions
diff --git a/trunk/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/trunk/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java deleted file mode 100644 index b93dc46741..0000000000 --- a/trunk/qpid/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.SocketIOTest; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.CountDownLatch; - -public class IOWriterClient implements Runnable -{ - private static final Logger _logger = LoggerFactory.getLogger(IOWriterClient.class); - - public static int DEFAULT_TEST_SIZE = 2; - - private IoSession _session; - - private long _startTime; - - private long[] _chunkTimes; - - public int _chunkCount = 200000; - - private int _chunkSize = 1024; - - private CountDownLatch _notifier; - - private int _maximumWriteQueueLength; - - static public int _PORT = IOWriterServer._PORT; - - public void run() - { - _logger.info("Starting to send " + _chunkCount + " buffers of " + _chunkSize + "B"); - _startTime = System.currentTimeMillis(); - _notifier = new CountDownLatch(1); - - for (int i = 0; i < _chunkCount; i++) - { - ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); - byte check = (byte) (i % 128); - buf.put(check); - buf.fill((byte) 88, buf.remaining()); - buf.flip(); - - _session.write(buf); - } - - long _sentall = System.currentTimeMillis(); - long _receivedall = _sentall; - try - { - _logger.info("All buffers sent; waiting for receipt from server"); - _notifier.await(); - _receivedall = System.currentTimeMillis(); - } - catch (InterruptedException e) - { - //Ignore - } - _logger.info("Completed"); - _logger.info("Total time waiting for server after last write: " + (_receivedall - _sentall)); - - long totalTime = System.currentTimeMillis() - _startTime; - - _logger.info("Total time: " + totalTime); - _logger.info("MB per second: " + (int) ((1.0 * _chunkSize * _chunkCount) / totalTime)); - long lastChunkTime = _startTime; - double average = 0; - for (int i = 0; i < _chunkTimes.length; i++) - { - if (i == 0) - { - average = _chunkTimes[i] - _startTime; - } - else - { - long delta = _chunkTimes[i] - lastChunkTime; - if (delta != 0) - { - average = (average + delta) / 2; - } - } - lastChunkTime = _chunkTimes[i]; - } - _logger.info("Average chunk time: " + average + "ms"); - _logger.info("Maximum WriteRequestQueue size: " + _maximumWriteQueueLength); - - CloseFuture cf = _session.close(); - _logger.info("Closing session"); - cf.join(); - } - - private class WriterHandler extends IoHandlerAdapter - { - private int _chunksReceived = 0; - - private int _partialBytesRead = 0; - - private byte _partialCheckNumber; - - private int _totalBytesReceived = 0; - - private int _receivedCount = 0; - private int _sentCount = 0; - private static final String DEFAULT_READ_BUFFER = "262144"; - private static final String DEFAULT_WRITE_BUFFER = "262144"; - - public void sessionCreated(IoSession session) throws Exception - { - IoFilterChain chain = session.getFilterChain(); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); - - writefilter.attach(chain); - } - - public void messageSent(IoSession session, Object message) throws Exception - { - _maximumWriteQueueLength = Math.max(session.getScheduledWriteRequests(), _maximumWriteQueueLength); - - if (_logger.isDebugEnabled()) - { - ++_sentCount; - if (_sentCount % 1000 == 0) - { - _logger.debug("Sent count " + _sentCount + ":WQueue" + session.getScheduledWriteRequests()); - - } - } - } - - public void messageReceived(IoSession session, Object message) throws Exception - { - if (_logger.isDebugEnabled()) - { - ++_receivedCount; - - if (_receivedCount % 1000 == 0) - { - _logger.debug("Receieved count " + _receivedCount); - } - } - - ByteBuffer result = (ByteBuffer) message; - _totalBytesReceived += result.remaining(); - int size = result.remaining(); - long now = System.currentTimeMillis(); - if (_partialBytesRead > 0) - { - int offset = _chunkSize - _partialBytesRead; - if (size >= offset) - { - _chunkTimes[_chunksReceived++] = now; - result.position(offset); - } - else - { - // have not read even one chunk, including the previous partial bytes - _partialBytesRead += size; - return; - } - } - - - int chunkCount = result.remaining() / _chunkSize; - - for (int i = 0; i < chunkCount; i++) - { - _chunkTimes[_chunksReceived++] = now; - byte check = result.get(); - _logger.debug("Check number " + check + " read"); - if (check != (byte) ((_chunksReceived - 1) % 128)) - { - _logger.error("Check number " + check + " read when expected " + (_chunksReceived % 128)); - } - _logger.debug("Chunk times recorded"); - - try - { - result.skip(_chunkSize - 1); - } - catch (IllegalArgumentException e) - { - _logger.error("Position was: " + result.position()); - _logger.error("Tried to skip to: " + (_chunkSize * i)); - _logger.error("limit was; " + result.limit()); - } - } - _logger.debug("Chunks received now " + _chunksReceived); - _logger.debug("Bytes received: " + _totalBytesReceived); - _partialBytesRead = result.remaining(); - - if (_partialBytesRead > 0) - { - _partialCheckNumber = result.get(); - } - - - if (_chunksReceived >= _chunkCount) - { - _notifier.countDown(); - } - - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - _logger.error("Error: " + cause, cause); - } - } - - public void startWriter() throws IOException, InterruptedException - { - - _maximumWriteQueueLength = 0; - - IoConnector ioConnector = null; - - if (Boolean.getBoolean("multinio")) - { - _logger.warn("Using MultiThread NIO"); - ioConnector = new org.apache.mina.transport.socket.nio.MultiThreadSocketConnector(); - } - else - { - _logger.warn("Using MINA NIO"); - ioConnector = new org.apache.mina.transport.socket.nio.SocketConnector(); - } - - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getDefaultConfig().getSessionConfig(); - scfg.setTcpNoDelay(true); - scfg.setSendBufferSize(32768); - scfg.setReceiveBufferSize(32768); - - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - - - final InetSocketAddress address = new InetSocketAddress("localhost", _PORT); - _logger.info("Attempting connection to " + address); - - //Old mina style -// ioConnector.setHandler(new WriterHandler()); -// ConnectFuture future = ioConnector.connect(address); - ConnectFuture future = ioConnector.connect(address, new WriterHandler()); - // wait for connection to complete - future.join(); - _logger.info("Connection completed"); - // we call getSession which throws an IOException if there has been an error connecting - _session = future.getSession(); - - _chunkTimes = new long[_chunkCount]; - Thread t = new Thread(this); - t.start(); - t.join(); - _logger.info("Test Complete"); - } - - - public void test1k() throws IOException, InterruptedException - { - _logger.info("Starting 1k test"); - _chunkSize = 1024; - startWriter(); - } - - - public void test2k() throws IOException, InterruptedException - { - _logger.info("Starting 2k test"); - _chunkSize = 2048; - startWriter(); - } - - - public void test4k() throws IOException, InterruptedException - { - _logger.info("Starting 4k test"); - _chunkSize = 4096; - startWriter(); - } - - - public void test8k() throws IOException, InterruptedException - { - _logger.info("Starting 8k test"); - _chunkSize = 8192; - startWriter(); - } - - - public void test16k() throws IOException, InterruptedException - { - _logger.info("Starting 16k test"); - _chunkSize = 16384; - startWriter(); - } - - - public void test32k() throws IOException, InterruptedException - { - _logger.info("Starting 32k test"); - _chunkSize = 32768; - startWriter(); - } - - - public static int getIntArg(String[] args, int index, int defaultValue) - { - if (args.length > index) - { - try - { - return Integer.parseInt(args[index]); - } - catch (NumberFormatException e) - { - //Do nothing - } - } - return defaultValue; - } - - public static void main(String[] args) throws IOException, InterruptedException - { - _PORT = getIntArg(args, 0, _PORT); - - int test = getIntArg(args, 1, DEFAULT_TEST_SIZE); - - IOWriterClient w = new IOWriterClient(); - w._chunkCount = getIntArg(args, 2, w._chunkCount); - switch (test) - { - case 0: - w.test1k(); - w.test2k(); - w.test4k(); - w.test8k(); - w.test16k(); - w.test32k(); - break; - case 1: - w.test1k(); - break; - case 2: - w.test2k(); - break; - case 4: - w.test4k(); - break; - case 8: - w.test8k(); - break; - case 16: - w.test16k(); - break; - case 32: - w.test32k(); - break; - } - } -} |