summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-05-30 03:25:24 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-05-30 03:25:24 +0000
commit162cb3879f3e25cbd13a777b40e374196ab531c9 (patch)
treea91950872bfaece987c9d513bf84ad84b0cc2f29
parent89627d5221b5600751b469f975cbbc228fb75816 (diff)
downloadqpid-python-162cb3879f3e25cbd13a777b40e374196ab531c9.tar.gz
This check in is for QPID-1102.
IoHandler and IoSender uses the java.io classes for IO operations and have shown very good improvement in latency and memory usage over MINA. For certain tests with pub/sub it gives a 2X improvement in throughput. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@661561 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Client.java55
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java187
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java126
3 files changed, 355 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index c82fcfa4b0..0bd203a8dd 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.qpidity.nclient;
import java.util.List;
@@ -25,6 +44,7 @@ import org.apache.qpidity.transport.ConnectionEvent;
import org.apache.qpidity.transport.TransportConstants;
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.network.io.IoHandler;
import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.transport.network.nio.NioHandler;
import org.slf4j.Logger;
@@ -52,6 +72,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
{
+
final Condition negotiationComplete = _lock.newCondition();
closeOk = _lock.newCondition();
_lock.lock();
@@ -122,7 +143,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
@Override public void init(Channel ch, ProtocolHeader hdr)
{
// TODO: once the merge is done we'll need to update this code
- // for handling 0.8 protocol version type i.e. major=8 and minor=0 :(
+ // for handling 0.8 protocol version type i.e. major=8 and minor=0 :(
if (hdr.getMajor() != TransportConstants.getVersionMajor()
|| hdr.getMinor() != TransportConstants.getVersionMinor())
{
@@ -148,19 +169,18 @@ public class Client implements org.apache.qpidity.nclient.Connection
connectionDelegate.setVirtualHost(virtualHost);
if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
- {
- if( _logger.isDebugEnabled())
- {
- _logger.debug("using NIO");
- }
+ {
+ _logger.info("using NIO Transport");
_conn = NioHandler.connect(host, port,connectionDelegate);
}
+ else if (System.getProperty("transport","mina").equalsIgnoreCase("io"))
+ {
+ _logger.info("using Plain IO Transport");
+ _conn = IoHandler.connect(host, port,connectionDelegate);
+ }
else
{
- if( _logger.isDebugEnabled())
- {
- _logger.debug("using MINA");
- }
+ _logger.info("using MINA Transport");
_conn = MinaHandler.connect(host, port,connectionDelegate);
// _conn = NativeHandler.connect(host, port,connectionDelegate);
}
@@ -260,10 +280,19 @@ public class Client implements org.apache.qpidity.nclient.Connection
ssn.attach(ch);
ssn.sessionAttach(ssn.getName());
ssn.sessionRequestTimeout(expiryInSeconds);
- if (Boolean.getBoolean("batch") && System.getProperty("transport").equalsIgnoreCase("nio"))
+ String transport = System.getProperty("transport","mina");
+
+ try
+ {
+ if (Boolean.getBoolean("batch") && ("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport)))
+ {
+ _logger.debug("using batch mode in transport " + transport);
+ IoHandler.startBatchingFrames(_conn.getConnectionId());
+ }
+ }
+ catch(Exception e)
{
- System.out.println("using batching");
- NioHandler.startBatchingFrames(_conn.getConnectionId());
+ e.printStackTrace();
}
return ssn;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
new file mode 100644
index 0000000000..50d37f4b10
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpidity.transport.network.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+import org.apache.qpidity.transport.util.Logger;
+
+/**
+ * This class provides a synchronous IO implementation using
+ * the java.io classes. The IoHandler runs in its own thread.
+ * The following params are configurable via JVM arguments
+ * TCP_NO_DELAY - amqj.tcpNoDelay
+ * SO_RCVBUF - amqj.receiveBufferSize
+ * SO_SNDBUF - amqj.sendBufferSize
+ */
+public class IoHandler implements Runnable
+{
+ private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+
+ private Receiver<ByteBuffer> _receiver;
+ private Socket _socket;
+ private byte[] _readBuf;
+ private static Map<Integer,IoSender> _handlers = new ConcurrentHashMap<Integer,IoSender>();
+ private AtomicInteger _count = new AtomicInteger();
+ private int _readBufferSize;
+ private int _writeBufferSize;
+
+ private static final Logger log = Logger.get(IoHandler.class);
+
+ private IoHandler()
+ {
+ _readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+ _writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+ }
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ IoHandler handler = new IoHandler();
+ return handler.connectInternal(host,port,delegate);
+ }
+
+ private Connection connectInternal(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ try
+ {
+ InetAddress address = InetAddress.getByName(host);
+ _socket = new Socket();
+ _socket.setReuseAddress(true);
+ _socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+
+ log.debug("default-SO_RCVBUF : " + _socket.getReceiveBufferSize());
+ log.debug("default-SO_SNDBUF : " + _socket.getSendBufferSize());
+
+ _socket.setSendBufferSize(_writeBufferSize);
+ _socket.setReceiveBufferSize(_readBufferSize);
+
+ log.debug("new-SO_RCVBUF : " + _socket.getReceiveBufferSize());
+ log.debug("new-SO_SNDBUF : " + _socket.getSendBufferSize());
+
+ if (address != null)
+ {
+ _socket.connect(new InetSocketAddress(address, port));
+ }
+ while (!_socket.isConnected())
+ {
+
+ }
+
+ }
+ catch (SocketException e)
+ {
+ log.error(e,"Error connecting to broker");
+ }
+ catch (IOException e)
+ {
+ log.error(e,"Error connecting to broker");
+ }
+
+ IoSender sender = new IoSender(_socket);
+ Connection con = new Connection
+ (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+ delegate);
+
+ con.setConnectionId(_count.incrementAndGet());
+ _handlers.put(con.getConnectionId(),sender);
+
+ _receiver = new InputHandler(new Assembler(con), InputHandler.State.PROTO_HDR);
+
+ Thread t = new Thread(this);
+ t.setName("IO Handler Thread-" + _count.get());
+ t.start();
+
+ return con;
+ }
+
+ public void run()
+ {
+ // I set the read buffer size simillar to SO_RCVBUF
+ // Haven't tested with a lower value to see its better or worse
+ _readBuf = new byte[_readBufferSize];
+ try
+ {
+ InputStream in = _socket.getInputStream();
+ int read = 0;
+ while(_socket.isConnected())
+ {
+ try
+ {
+ read = in.read(_readBuf);
+ if (read > 0)
+ {
+ ByteBuffer b = ByteBuffer.allocate(read);
+ b.put(_readBuf,0,read);
+ b.flip();
+ //byte[] temp = new byte[read];
+ //System.arraycopy(_readBuf, 0,temp, 0, read);
+ //ByteBuffer b = ByteBuffer.wrap(temp);
+ _receiver.received(b);
+ }
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Error reading from socket input stream",e);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error getting input stream from the socket",e);
+ }
+ finally
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(Exception e)
+ {
+ log.error(e,"Error closing socket");
+ }
+ }
+ }
+
+ public static void startBatchingFrames(int connectionId)
+ {
+ IoSender sender = _handlers.get(connectionId);
+ sender.setStartBatching();
+ }
+
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
new file mode 100644
index 0000000000..72bfb3c9d4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.transport.network.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.Sender;
+import org.apache.qpidity.transport.util.Logger;
+
+public class IoSender implements Sender<java.nio.ByteBuffer>
+{
+ private final Object lock = new Object();
+ private Socket _socket;
+ private OutputStream _outStream;
+ private boolean _batch = false;
+ private ByteBuffer _buffer;
+
+ private static final Logger log = Logger.get(IoHandler.class);
+
+ public IoSender(Socket socket)
+ {
+ this._socket = socket;
+ try
+ {
+ _outStream = _socket.getOutputStream();
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException("Error getting output stream for socket",e);
+ }
+ }
+
+ /*
+ * Currently I don't implement any in memory buffering
+ * and just write straight to the wire.
+ * I want to experiment with buffering and see if I can
+ * get more performance, all though latency will suffer
+ * a bit.
+ */
+ public void send(java.nio.ByteBuffer buf)
+ {
+ write(buf);
+ }
+
+ /* The extra copying sucks.
+ * If I know for sure that the buf is backed
+ * by an array then I could do buf.array()
+ */
+ private void write(java.nio.ByteBuffer buf)
+ {
+ byte[] array = new byte[buf.remaining()];
+ buf.get(array);
+ if( _socket.isConnected())
+ {
+ synchronized (lock)
+ {
+ try
+ {
+ _outStream.write(array);
+ }
+ catch(Exception e)
+ {
+ e.fillInStackTrace();
+ throw new RuntimeException("Error trying to write to the socket",e);
+ }
+ }
+ }
+ else
+ {
+ throw new RuntimeException("Trying to write on a closed socket");
+ }
+ }
+
+ /*
+ * Haven't used this, but the intention is
+ * to experiment with it yet.
+ * Also need to make sure the buffer size
+ * is configurable
+ */
+ public void setStartBatching()
+ {
+ _batch = true;
+ try
+ {
+ _buffer = ByteBuffer.allocate(2048);
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Unable to set SO_SNDBUF due to socket error",e);
+ }
+ }
+
+ public void close()
+ {
+ synchronized (lock)
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+}