diff options
3 files changed, 355 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index c82fcfa4b0..0bd203a8dd 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.qpidity.nclient; import java.util.List; @@ -25,6 +44,7 @@ import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.TransportConstants; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.SessionDelegate; +import org.apache.qpidity.transport.network.io.IoHandler; import org.apache.qpidity.transport.network.mina.MinaHandler; import org.apache.qpidity.transport.network.nio.NioHandler; import org.slf4j.Logger; @@ -52,6 +72,7 @@ public class Client implements org.apache.qpidity.nclient.Connection public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException { + final Condition negotiationComplete = _lock.newCondition(); closeOk = _lock.newCondition(); _lock.lock(); @@ -122,7 +143,7 @@ public class Client implements org.apache.qpidity.nclient.Connection @Override public void init(Channel ch, ProtocolHeader hdr) { // TODO: once the merge is done we'll need to update this code - // for handling 0.8 protocol version type i.e. major=8 and minor=0 :( + // for handling 0.8 protocol version type i.e. major=8 and minor=0 :( if (hdr.getMajor() != TransportConstants.getVersionMajor() || hdr.getMinor() != TransportConstants.getVersionMinor()) { @@ -148,19 +169,18 @@ public class Client implements org.apache.qpidity.nclient.Connection connectionDelegate.setVirtualHost(virtualHost); if (System.getProperty("transport","mina").equalsIgnoreCase("nio")) - { - if( _logger.isDebugEnabled()) - { - _logger.debug("using NIO"); - } + { + _logger.info("using NIO Transport"); _conn = NioHandler.connect(host, port,connectionDelegate); } + else if (System.getProperty("transport","mina").equalsIgnoreCase("io")) + { + _logger.info("using Plain IO Transport"); + _conn = IoHandler.connect(host, port,connectionDelegate); + } else { - if( _logger.isDebugEnabled()) - { - _logger.debug("using MINA"); - } + _logger.info("using MINA Transport"); _conn = MinaHandler.connect(host, port,connectionDelegate); // _conn = NativeHandler.connect(host, port,connectionDelegate); } @@ -260,10 +280,19 @@ public class Client implements org.apache.qpidity.nclient.Connection ssn.attach(ch); ssn.sessionAttach(ssn.getName()); ssn.sessionRequestTimeout(expiryInSeconds); - if (Boolean.getBoolean("batch") && System.getProperty("transport").equalsIgnoreCase("nio")) + String transport = System.getProperty("transport","mina"); + + try + { + if (Boolean.getBoolean("batch") && ("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport))) + { + _logger.debug("using batch mode in transport " + transport); + IoHandler.startBatchingFrames(_conn.getConnectionId()); + } + } + catch(Exception e) { - System.out.println("using batching"); - NioHandler.startBatchingFrames(_conn.getConnectionId()); + e.printStackTrace(); } return ssn; } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java new file mode 100644 index 0000000000..50d37f4b10 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpidity.transport.network.io; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpidity.transport.Connection; +import org.apache.qpidity.transport.ConnectionDelegate; +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.network.Assembler; +import org.apache.qpidity.transport.network.Disassembler; +import org.apache.qpidity.transport.network.InputHandler; +import org.apache.qpidity.transport.network.OutputHandler; +import org.apache.qpidity.transport.util.Logger; + +/** + * This class provides a synchronous IO implementation using + * the java.io classes. The IoHandler runs in its own thread. + * The following params are configurable via JVM arguments + * TCP_NO_DELAY - amqj.tcpNoDelay + * SO_RCVBUF - amqj.receiveBufferSize + * SO_SNDBUF - amqj.sendBufferSize + */ +public class IoHandler implements Runnable +{ + private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; + + private Receiver<ByteBuffer> _receiver; + private Socket _socket; + private byte[] _readBuf; + private static Map<Integer,IoSender> _handlers = new ConcurrentHashMap<Integer,IoSender>(); + private AtomicInteger _count = new AtomicInteger(); + private int _readBufferSize; + private int _writeBufferSize; + + private static final Logger log = Logger.get(IoHandler.class); + + private IoHandler() + { + _readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); + _writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); + } + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + IoHandler handler = new IoHandler(); + return handler.connectInternal(host,port,delegate); + } + + private Connection connectInternal(String host, int port, + ConnectionDelegate delegate) + { + try + { + InetAddress address = InetAddress.getByName(host); + _socket = new Socket(); + _socket.setReuseAddress(true); + _socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + + log.debug("default-SO_RCVBUF : " + _socket.getReceiveBufferSize()); + log.debug("default-SO_SNDBUF : " + _socket.getSendBufferSize()); + + _socket.setSendBufferSize(_writeBufferSize); + _socket.setReceiveBufferSize(_readBufferSize); + + log.debug("new-SO_RCVBUF : " + _socket.getReceiveBufferSize()); + log.debug("new-SO_SNDBUF : " + _socket.getSendBufferSize()); + + if (address != null) + { + _socket.connect(new InetSocketAddress(address, port)); + } + while (!_socket.isConnected()) + { + + } + + } + catch (SocketException e) + { + log.error(e,"Error connecting to broker"); + } + catch (IOException e) + { + log.error(e,"Error connecting to broker"); + } + + IoSender sender = new IoSender(_socket); + Connection con = new Connection + (new Disassembler(new OutputHandler(sender), 64*1024 - 1), + delegate); + + con.setConnectionId(_count.incrementAndGet()); + _handlers.put(con.getConnectionId(),sender); + + _receiver = new InputHandler(new Assembler(con), InputHandler.State.PROTO_HDR); + + Thread t = new Thread(this); + t.setName("IO Handler Thread-" + _count.get()); + t.start(); + + return con; + } + + public void run() + { + // I set the read buffer size simillar to SO_RCVBUF + // Haven't tested with a lower value to see its better or worse + _readBuf = new byte[_readBufferSize]; + try + { + InputStream in = _socket.getInputStream(); + int read = 0; + while(_socket.isConnected()) + { + try + { + read = in.read(_readBuf); + if (read > 0) + { + ByteBuffer b = ByteBuffer.allocate(read); + b.put(_readBuf,0,read); + b.flip(); + //byte[] temp = new byte[read]; + //System.arraycopy(_readBuf, 0,temp, 0, read); + //ByteBuffer b = ByteBuffer.wrap(temp); + _receiver.received(b); + } + } + catch(Exception e) + { + throw new RuntimeException("Error reading from socket input stream",e); + } + } + } + catch (IOException e) + { + throw new RuntimeException("Error getting input stream from the socket",e); + } + finally + { + try + { + _socket.close(); + } + catch(Exception e) + { + log.error(e,"Error closing socket"); + } + } + } + + public static void startBatchingFrames(int connectionId) + { + IoSender sender = _handlers.get(connectionId); + sender.setStartBatching(); + } + + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java new file mode 100644 index 0000000000..72bfb3c9d4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.transport.network.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; + +import org.apache.qpidity.transport.Sender; +import org.apache.qpidity.transport.util.Logger; + +public class IoSender implements Sender<java.nio.ByteBuffer> +{ + private final Object lock = new Object(); + private Socket _socket; + private OutputStream _outStream; + private boolean _batch = false; + private ByteBuffer _buffer; + + private static final Logger log = Logger.get(IoHandler.class); + + public IoSender(Socket socket) + { + this._socket = socket; + try + { + _outStream = _socket.getOutputStream(); + } + catch(IOException e) + { + throw new RuntimeException("Error getting output stream for socket",e); + } + } + + /* + * Currently I don't implement any in memory buffering + * and just write straight to the wire. + * I want to experiment with buffering and see if I can + * get more performance, all though latency will suffer + * a bit. + */ + public void send(java.nio.ByteBuffer buf) + { + write(buf); + } + + /* The extra copying sucks. + * If I know for sure that the buf is backed + * by an array then I could do buf.array() + */ + private void write(java.nio.ByteBuffer buf) + { + byte[] array = new byte[buf.remaining()]; + buf.get(array); + if( _socket.isConnected()) + { + synchronized (lock) + { + try + { + _outStream.write(array); + } + catch(Exception e) + { + e.fillInStackTrace(); + throw new RuntimeException("Error trying to write to the socket",e); + } + } + } + else + { + throw new RuntimeException("Trying to write on a closed socket"); + } + } + + /* + * Haven't used this, but the intention is + * to experiment with it yet. + * Also need to make sure the buffer size + * is configurable + */ + public void setStartBatching() + { + _batch = true; + try + { + _buffer = ByteBuffer.allocate(2048); + } + catch(Exception e) + { + throw new RuntimeException("Unable to set SO_SNDBUF due to socket error",e); + } + } + + public void close() + { + synchronized (lock) + { + try + { + _socket.close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } +} |