diff options
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/mina/WriterTest.java')
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/mina/WriterTest.java | 323 |
1 files changed, 164 insertions, 159 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java index ae0b7ea522..910345624f 100644 --- a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java +++ b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java @@ -1,256 +1,256 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.mina; -import junit.framework.JUnit4TestAdapter; import org.apache.log4j.Logger; import org.apache.mina.common.*; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.junit.Test; import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; -public class WriterTest implements Runnable +import junit.framework.TestCase; + +public class WriterTest extends TestCase { private static final Logger _logger = Logger.getLogger(WriterTest.class); - private IoSession _session; + private static class RunnableWriterTest implements Runnable + { + private Logger _logger; - private long _startTime; + private IoSession _session; - private long[] _chunkTimes; + private long _startTime; - private int _chunkCount = 500000; + private long[] _chunkTimes; - private int _chunkSize = 1024; + private int _chunkCount = 500000; - private CountDownLatch _notifier; + private int _chunkSize = 1024; - public void run() - { - _startTime = System.currentTimeMillis(); - _notifier = new CountDownLatch(1); - for (int i = 0; i < _chunkCount; i++) - { - ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); - byte check = (byte) (i % 128); - buf.put(check); - buf.fill((byte)88, buf.remaining()); - buf.flip(); - _session.write(buf); - } + private CountDownLatch _notifier; - try - { - _logger.info("All buffers sent; waiting for receipt from server"); - _notifier.await(); - } - catch (InterruptedException e) + public RunnableWriterTest(Logger logger) { + _logger = logger; } - _logger.info("Completed"); - long totalTime = System.currentTimeMillis() - _startTime; - _logger.info("Total time: " + totalTime); - _logger.info("MB per second: " + (_chunkSize * _chunkCount)/totalTime); - long lastChunkTime = _startTime; - double average = 0; - for (int i = 0; i < _chunkTimes.length; i++) + + public void run() { - if (i == 0) + _startTime = System.currentTimeMillis(); + _notifier = new CountDownLatch(1); + for (int i = 0; i < _chunkCount; i++) + { + ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); + byte check = (byte) (i % 128); + buf.put(check); + buf.fill((byte)88, buf.remaining()); + buf.flip(); + _session.write(buf); + } + + try + { + _logger.info("All buffers sent; waiting for receipt from server"); + _notifier.await(); + } + catch (InterruptedException e) { - average = _chunkTimes[i] - _startTime; } - else + _logger.info("Completed"); + long totalTime = System.currentTimeMillis() - _startTime; + _logger.info("Total time: " + totalTime); + _logger.info("MB per second: " + (_chunkSize * _chunkCount)/totalTime); + long lastChunkTime = _startTime; + double average = 0; + for (int i = 0; i < _chunkTimes.length; i++) { - long delta = _chunkTimes[i] - lastChunkTime; - if (delta != 0) + if (i == 0) + { + average = _chunkTimes[i] - _startTime; + } + else { - average = (average + delta)/2; + long delta = _chunkTimes[i] - lastChunkTime; + if (delta != 0) + { + average = (average + delta)/2; + } } + lastChunkTime = _chunkTimes[i]; } - lastChunkTime = _chunkTimes[i]; + _logger.info("Average chunk time: " + average + "ms"); + CloseFuture cf = _session.close(); + cf.join(); } - _logger.info("Average chunk time: " + average + "ms"); - CloseFuture cf = _session.close(); - cf.join(); - } - private class WriterHandler extends IoHandlerAdapter - { - private int _chunksReceived = 0; + private class WriterHandler extends IoHandlerAdapter + { + private int _chunksReceived = 0; - private int _partialBytesRead = 0; + private int _partialBytesRead = 0; - private byte _partialCheckNumber; + private byte _partialCheckNumber; - private int _totalBytesReceived = 0; + private int _totalBytesReceived = 0; - public void messageReceived(IoSession session, Object message) throws Exception - { - ByteBuffer result = (ByteBuffer) message; - _totalBytesReceived += result.remaining(); - int size = result.remaining(); - long now = System.currentTimeMillis(); - if (_partialBytesRead > 0) + public void messageReceived(IoSession session, Object message) throws Exception { - int offset = _chunkSize - _partialBytesRead; - if (size >= offset) + ByteBuffer result = (ByteBuffer) message; + _totalBytesReceived += result.remaining(); + int size = result.remaining(); + long now = System.currentTimeMillis(); + if (_partialBytesRead > 0) { - _chunkTimes[_chunksReceived++] = now; - result.position(offset); + int offset = _chunkSize - _partialBytesRead; + if (size >= offset) + { + _chunkTimes[_chunksReceived++] = now; + result.position(offset); + } + else + { + // have not read even one chunk, including the previous partial bytes + _partialBytesRead += size; + return; + } } - else - { - // have not read even one chunk, including the previous partial bytes - _partialBytesRead += size; - return; - } - } - int chunkCount = result.remaining()/_chunkSize; + int chunkCount = result.remaining()/_chunkSize; - for (int i = 0; i < chunkCount; i++) - { - _chunkTimes[_chunksReceived++] = now; - byte check = result.get(); - _logger.debug("Check number " + check + " read"); - if (check != (byte)((_chunksReceived - 1)%128)) + for (int i = 0; i < chunkCount; i++) { - _logger.error("Check number " + check + " read when expected " + (_chunksReceived%128)); + _chunkTimes[_chunksReceived++] = now; + byte check = result.get(); + _logger.debug("Check number " + check + " read"); + if (check != (byte)((_chunksReceived - 1)%128)) + { + _logger.error("Check number " + check + " read when expected " + (_chunksReceived%128)); + } + _logger.debug("Chunk times recorded"); + + try + { + result.skip(_chunkSize - 1); + } + catch (IllegalArgumentException e) + { + _logger.error("Position was: " + result.position()); + _logger.error("Tried to skip to: " + (_chunkSize * i)); + _logger.error("limit was; " + result.limit()); + } } - _logger.debug("Chunk times recorded"); + _logger.debug("Chunks received now " + _chunksReceived); + _logger.debug("Bytes received: " + _totalBytesReceived); + _partialBytesRead = result.remaining(); - try + if (_partialBytesRead > 0) { - result.skip(_chunkSize - 1); + _partialCheckNumber = result.get(); } - catch (IllegalArgumentException e) + + if (_chunksReceived >= _chunkCount) { - _logger.error("Position was: " + result.position()); - _logger.error("Tried to skip to: " + (_chunkSize * i)); - _logger.error("limit was; " + result.limit()); + _notifier.countDown(); } - } - _logger.debug("Chunks received now " + _chunksReceived); - _logger.debug("Bytes received: " + _totalBytesReceived); - _partialBytesRead = result.remaining(); - if (_partialBytesRead > 0) - { - _partialCheckNumber = result.get(); } - if (_chunksReceived >= _chunkCount) + public void exceptionCaught(IoSession session, Throwable cause) throws Exception { - _notifier.countDown(); + _logger.error("Error: " + cause, cause); } - } - public void exceptionCaught(IoSession session, Throwable cause) throws Exception + public void startWriter(int chunkSize) throws IOException, InterruptedException { - _logger.error("Error: " + cause, cause); - } - } + _chunkSize = chunkSize; - public void startWriter() throws IOException, InterruptedException - { - IoConnector ioConnector = null; + IoConnector ioConnector = null; - ioConnector = new SocketConnector(); + ioConnector = new SocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - cfg.setThreadModel(ThreadModel.MANUAL); - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay(true); - scfg.setSendBufferSize(32768); - scfg.setReceiveBufferSize(32768); - - final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT); - _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, new WriterHandler()); - // wait for connection to complete - future.join(); - _logger.info("Connection completed"); - // we call getSession which throws an IOException if there has been an error connecting - _session = future.getSession(); - _chunkTimes = new long[_chunkCount]; - Thread t = new Thread(this); - t.start(); - t.join(); - _logger.info("Test completed"); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + cfg.setThreadModel(ThreadModel.MANUAL); + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(32768); + scfg.setReceiveBufferSize(32768); + + final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT); + _logger.info("Attempting connection to " + address); + ConnectFuture future = ioConnector.connect(address, new WriterHandler()); + // wait for connection to complete + future.join(); + _logger.info("Connection completed"); + // we call getSession which throws an IOException if there has been an error connecting + _session = future.getSession(); + _chunkTimes = new long[_chunkCount]; + Thread t = new Thread(this); + t.start(); + t.join(); + _logger.info("Test completed"); + } } - @Test + private RunnableWriterTest _runnableWriterTest = new RunnableWriterTest(_logger); + public void test1k() throws IOException, InterruptedException { _logger.info("Starting 1k test"); - _chunkSize = 1024; - startWriter(); + _runnableWriterTest.startWriter(1024); } - @Test public void test2k() throws IOException, InterruptedException { _logger.info("Starting 2k test"); - _chunkSize = 2048; - startWriter(); + _runnableWriterTest.startWriter(2048); } - @Test public void test4k() throws IOException, InterruptedException { _logger.info("Starting 4k test"); - _chunkSize = 4096; - startWriter(); + _runnableWriterTest.startWriter(4096); } - @Test public void test8k() throws IOException, InterruptedException { _logger.info("Starting 8k test"); - _chunkSize = 8192; - startWriter(); + _runnableWriterTest.startWriter(8192); } - @Test public void test16k() throws IOException, InterruptedException { _logger.info("Starting 16k test"); - _chunkSize = 16384; - startWriter(); + _runnableWriterTest.startWriter(16384); } - @Test public void test32k() throws IOException, InterruptedException { _logger.info("Starting 32k test"); - _chunkSize = 32768; - startWriter(); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(WriterTest.class); + _runnableWriterTest.startWriter(32768); } public static void main(String[] args) throws IOException, InterruptedException @@ -263,4 +263,9 @@ public class WriterTest implements Runnable //w.test16k(); //w.test32k(); } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(WriterTest.class); + } } |