diff options
9 files changed, 536 insertions, 414 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 472eaef5b5..b3b3cc1ffd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -59,6 +59,7 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; import java.net.ConnectException; +import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.*; @@ -467,6 +468,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (connectionException.getCause() != null) { message = connectionException.getCause().getMessage(); + connectionException.getCause().printStackTrace(); } else { @@ -486,18 +488,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - AMQException e = new AMQConnectionFailureException(message, connectionException); - - if (connectionException != null) + for (Throwable th = connectionException; th != null; th = th.getCause()) { - if (connectionException instanceof UnresolvedAddressException) + if (th instanceof UnresolvedAddressException || + th instanceof UnknownHostException) { - e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(), - null); + throw new AMQUnresolvedAddressException + (message, + _failoverPolicy.getCurrentBrokerDetails().toString(), + connectionException); } - } - throw e; + + throw new AMQConnectionFailureException(message, connectionException); } _connectionMetaData = new QpidConnectionMetaData(this); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index 7de7c71d36..f8d5bbcb1c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -43,7 +43,7 @@ import org.apache.qpidity.transport.ConnectionCloseOk; import org.apache.qpidity.transport.TransportConstants; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.SessionDelegate; -import org.apache.qpidity.transport.network.io.IoHandler; +import org.apache.qpidity.transport.network.io.IoTransport; import org.apache.qpidity.transport.network.mina.MinaHandler; import org.apache.qpidity.transport.network.nio.NioHandler; import org.slf4j.Logger; @@ -167,15 +167,16 @@ public class Client implements org.apache.qpidity.nclient.Connection connectionDelegate.setPassword(password); connectionDelegate.setVirtualHost(virtualHost); - if (System.getProperty("transport","mina").equalsIgnoreCase("nio")) + String transport = System.getProperty("transport","io"); + if (transport.equalsIgnoreCase("nio")) { _logger.info("using NIO Transport"); _conn = NioHandler.connect(host, port,connectionDelegate); } - else if (System.getProperty("transport","mina").equalsIgnoreCase("io")) + else if (transport.equalsIgnoreCase("io")) { _logger.info("using Plain IO Transport"); - _conn = IoHandler.connect(host, port,connectionDelegate); + _conn = IoTransport.connect(host, port,connectionDelegate); } else { @@ -287,20 +288,6 @@ public class Client implements org.apache.qpidity.nclient.Connection ssn.attach(ch); ssn.sessionAttach(ssn.getName()); ssn.sessionRequestTimeout(expiryInSeconds); - String transport = System.getProperty("transport","mina"); - - try - { - if (Boolean.getBoolean("batch") && ("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport))) - { - _logger.debug("using batch mode in transport " + transport); - IoHandler.startBatchingFrames(_conn.getConnectionId()); - } - } - catch(Exception e) - { - e.printStackTrace(); - } return ssn; } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index ca572119d9..75dd9fc2ff 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -249,7 +249,7 @@ public class Session extends Invoker } } - protected void invoke(Method m) + public void invoke(Method m) { if (m.getEncodedTrack() == Frame.L4) { diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index 48f68e9020..8963f831da 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -21,6 +21,7 @@ package org.apache.qpidity.transport.network; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolHeader; @@ -38,53 +39,41 @@ import static org.apache.qpidity.transport.network.InputHandler.State.*; * @author Rafael H. Schloming */ -public class InputHandler implements Receiver<ByteBuffer> +public final class InputHandler implements Receiver<ByteBuffer> { public enum State { PROTO_HDR, - PROTO_HDR_M, - PROTO_HDR_Q, - PROTO_HDR_P, - PROTO_HDR_CLASS, - PROTO_HDR_INSTANCE, - PROTO_HDR_MAJOR, - PROTO_HDR_MINOR, FRAME_HDR, - FRAME_HDR_TYPE, - FRAME_HDR_SIZE1, - FRAME_HDR_SIZE2, - FRAME_HDR_RSVD1, - FRAME_HDR_TRACK, - FRAME_HDR_CH1, - FRAME_HDR_CH2, - FRAME_HDR_RSVD2, - FRAME_HDR_RSVD3, - FRAME_HDR_RSVD4, - FRAME_HDR_RSVD5, - FRAME_FRAGMENT, + FRAME_BODY, ERROR; } private final Receiver<NetworkEvent> receiver; private State state; - - private byte instance; - private byte major; - private byte minor; + private ByteBuffer input = null; + private int needed; private byte flags; private SegmentType type; private byte track; private int channel; - private int size; - private ByteBuffer body; public InputHandler(Receiver<NetworkEvent> receiver, State state) { this.receiver = receiver; this.state = state; + + switch (state) + { + case PROTO_HDR: + needed = 8; + break; + case FRAME_HDR: + needed = Frame.HEADER_SIZE; + break; + } } public InputHandler(Receiver<NetworkEvent> receiver) @@ -92,18 +81,6 @@ public class InputHandler implements Receiver<ByteBuffer> this(receiver, PROTO_HDR); } - private void init() - { - receiver.received(new ProtocolHeader(instance, major, minor)); - } - - private void frame() - { - Frame frame = new Frame(flags, type, track, channel, body); - assert size == frame.getSize(); - receiver.received(frame); - } - private void error(String fmt, Object ... args) { receiver.received(new ProtocolError(Frame.L1, fmt, args)); @@ -111,157 +88,109 @@ public class InputHandler implements Receiver<ByteBuffer> public void received(ByteBuffer buf) { - while (buf.hasRemaining()) + int limit = buf.limit(); + int remaining = buf.remaining(); + while (remaining > 0) { - state = next(buf); + if (remaining >= needed) + { + int consumed = needed; + int pos = buf.position(); + if (input == null) + { + buf.limit(pos + needed); + input = buf; + state = next(pos); + buf.limit(limit); + buf.position(pos + consumed); + } + else + { + buf.limit(pos + needed); + input.put(buf); + buf.limit(limit); + input.flip(); + state = next(0); + } + + remaining -= consumed; + input = null; + } + else + { + if (input == null) + { + input = ByteBuffer.allocate(needed); + } + input.put(buf); + needed -= remaining; + remaining = 0; + } } } - private State next(ByteBuffer buf) + private State next(int pos) { + input.order(ByteOrder.BIG_ENDIAN); + switch (state) { case PROTO_HDR: - return expect(buf, 'A', PROTO_HDR_M); - case PROTO_HDR_M: - return expect(buf, 'M', PROTO_HDR_Q); - case PROTO_HDR_Q: - return expect(buf, 'Q', PROTO_HDR_P); - case PROTO_HDR_P: - return expect(buf, 'P', PROTO_HDR_CLASS); - case PROTO_HDR_CLASS: - return expect(buf, 1, PROTO_HDR_INSTANCE); - case PROTO_HDR_INSTANCE: - instance = buf.get(); - return PROTO_HDR_MAJOR; - case PROTO_HDR_MAJOR: - major = buf.get(); - return PROTO_HDR_MINOR; - case PROTO_HDR_MINOR: - minor = buf.get(); - init(); + if (input.get(pos) != 'A' && + input.get(pos + 1) != 'M' && + input.get(pos + 2) != 'Q' && + input.get(pos + 3) != 'P') + { + error("bad protocol header: %s", str(input)); + return ERROR; + } + + byte instance = input.get(pos + 5); + byte major = input.get(pos + 6); + byte minor = input.get(pos + 7); + receiver.received(new ProtocolHeader(instance, major, minor)); + needed = Frame.HEADER_SIZE; return FRAME_HDR; case FRAME_HDR: - flags = buf.get(); - return FRAME_HDR_TYPE; - case FRAME_HDR_TYPE: - type = SegmentType.get(buf.get()); - return FRAME_HDR_SIZE1; - case FRAME_HDR_SIZE1: - size = (0xFF & buf.get()) << 8; - return FRAME_HDR_SIZE2; - case FRAME_HDR_SIZE2: - size += 0xFF & buf.get(); - size -= 12; + flags = input.get(pos); + type = SegmentType.get(input.get(pos + 1)); + int size = (0xFFFF & input.getShort(pos + 2)); + size -= Frame.HEADER_SIZE; if (size < 0 || size > (64*1024 - 12)) { error("bad frame size: %d", size); return ERROR; } - else - { - return FRAME_HDR_RSVD1; - } - case FRAME_HDR_RSVD1: - return expect(buf, 0, FRAME_HDR_TRACK); - case FRAME_HDR_TRACK: - byte b = buf.get(); + byte b = input.get(pos + 5); if ((b & 0xF0) != 0) { error("non-zero reserved bits in upper nibble of " + "frame header byte 5: '%x'", b); return ERROR; } else { track = (byte) (b & 0xF); - return FRAME_HDR_CH1; } - case FRAME_HDR_CH1: - channel = (0xFF & buf.get()) << 8; - return FRAME_HDR_CH2; - case FRAME_HDR_CH2: - channel += 0xFF & buf.get(); - return FRAME_HDR_RSVD2; - case FRAME_HDR_RSVD2: - return expect(buf, 0, FRAME_HDR_RSVD3); - case FRAME_HDR_RSVD3: - return expect(buf, 0, FRAME_HDR_RSVD4); - case FRAME_HDR_RSVD4: - return expect(buf, 0, FRAME_HDR_RSVD5); - case FRAME_HDR_RSVD5: - if (!expect(buf, 0)) + channel = (0xFFFF & input.getShort(pos + 6)); + if (size == 0) { - return ERROR; - } - - if (size > buf.remaining()) { - body = ByteBuffer.allocate(size); - body.put(buf); - return FRAME_FRAGMENT; - } else { - body = buf.slice(); - body.limit(size); - buf.position(buf.position() + size); - frame(); + Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0)); + receiver.received(frame); + needed = Frame.HEADER_SIZE; return FRAME_HDR; } - case FRAME_FRAGMENT: - int delta = body.remaining(); - if (delta > buf.remaining()) { - body.put(buf); - return FRAME_FRAGMENT; - } else { - ByteBuffer fragment = buf.slice(); - fragment.limit(delta); - buf.position(buf.position() + delta); - body.put(fragment); - body.flip(); - frame(); - return FRAME_HDR; + else + { + needed = size; + return FRAME_BODY; } + case FRAME_BODY: + Frame frame = new Frame(flags, type, track, channel, input.slice()); + receiver.received(frame); + needed = Frame.HEADER_SIZE; + return FRAME_HDR; default: throw new IllegalStateException(); } } - private State expect(ByteBuffer buf, int expected, State next) - { - return expect(buf, (byte) expected, next); - } - - private State expect(ByteBuffer buf, char expected, State next) - { - return expect(buf, (byte) expected, next); - } - - private State expect(ByteBuffer buf, byte expected, State next) - { - if (expect(buf, expected)) - { - return next; - } - else - { - return ERROR; - } - } - - private boolean expect(ByteBuffer buf, int expected) - { - return expect(buf, (byte) expected); - } - - private boolean expect(ByteBuffer buf, byte expected) - { - byte b = buf.get(); - if (b == expected) - { - return true; - } - else - { - error("expecting '%x', got '%x'", expected, b); - return false; - } - } - public void exception(Throwable t) { receiver.exception(t); diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java deleted file mode 100644 index 82fd03857a..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpidity.transport.network.io; - -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.qpidity.transport.Connection; -import org.apache.qpidity.transport.ConnectionDelegate; -import org.apache.qpidity.transport.Receiver; -import org.apache.qpidity.transport.TransportException; -import org.apache.qpidity.transport.network.Assembler; -import org.apache.qpidity.transport.network.Disassembler; -import org.apache.qpidity.transport.network.InputHandler; -import org.apache.qpidity.transport.network.OutputHandler; -import org.apache.qpidity.transport.util.Logger; - -/** - * This class provides a synchronous IO implementation using - * the java.io classes. The IoHandler runs in its own thread. - * The following params are configurable via JVM arguments - * TCP_NO_DELAY - amqj.tcpNoDelay - * SO_RCVBUF - amqj.receiveBufferSize - * SO_SNDBUF - amqj.sendBufferSize - */ -public class IoHandler implements Runnable -{ - private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; - - private Receiver<ByteBuffer> _receiver; - private Socket _socket; - private byte[] _readBuf; - private static AtomicInteger _count = new AtomicInteger(); - private int _readBufferSize; - private int _writeBufferSize; - - private static final Logger log = Logger.get(IoHandler.class); - - private IoHandler() - { - _readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); - _writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); - } - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate) - { - IoHandler handler = new IoHandler(); - return handler.connectInternal(host,port,delegate); - } - - private Connection connectInternal(String host, int port, - ConnectionDelegate delegate) - { - try - { - InetAddress address = InetAddress.getByName(host); - _socket = new Socket(); - _socket.setReuseAddress(true); - _socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - - log.debug("default-SO_RCVBUF : " + _socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : " + _socket.getSendBufferSize()); - - _socket.setSendBufferSize(_writeBufferSize); - _socket.setReceiveBufferSize(_readBufferSize); - - log.debug("new-SO_RCVBUF : " + _socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : " + _socket.getSendBufferSize()); - - if (address != null) - { - _socket.connect(new InetSocketAddress(address, port)); - } - } - catch (SocketException e) - { - throw new TransportException("Error connecting to broker",e); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker",e); - } - - IoSender sender = new IoSender(_socket); - Connection con = new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), - delegate); - - con.setConnectionId(_count.incrementAndGet()); - _receiver = new InputHandler(new Assembler(con), InputHandler.State.PROTO_HDR); - - Thread t = new Thread(this); - t.setName("IO Handler Thread-" + _count.get()); - t.start(); - - return con; - } - - public void run() - { - // I set the read_buffer size simillar to SO_RCVBUF - // Haven't tested with a lower value to see its better or worse - _readBuf = new byte[_readBufferSize]; - try - { - InputStream in = _socket.getInputStream(); - int read = 0; - while(read != -1) - { - read = in.read(_readBuf); - if (read > 0) - { - ByteBuffer b = ByteBuffer.allocate(read); - b.put(_readBuf,0,read); - b.flip(); - _receiver.received(b); - } - } - } - catch (IOException e) - { - _receiver.exception(new Exception("Error getting input stream from the socket",e)); - } - finally - { - try - { - _socket.close(); - } - catch(Exception e) - { - log.error(e,"Error closing socket"); - } - } - } - - /** - * Will experiment in a future version with batching - */ - public static void startBatchingFrames(int connectionId) - { - - } - - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java new file mode 100644 index 0000000000..426c41dd55 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport.network.io; + +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.TransportException; +import org.apache.qpidity.transport.util.Logger; + +import java.io.InputStream; +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; + +/** + * IoReceiver + * + */ + +final class IoReceiver extends Thread +{ + + private static final Logger log = Logger.get(IoReceiver.class); + + private final IoTransport transport; + private final Receiver<ByteBuffer> receiver; + private final int bufferSize; + private final Socket socket; + + public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize) + { + this.transport = transport; + this.receiver = receiver; + this.bufferSize = bufferSize; + this.socket = transport.getSocket(); + + setName(String.format("IoReceive - %s", socket.getRemoteSocketAddress())); + start(); + } + + public void run() + { + final int threshold = bufferSize / 2; + + // I set the read buffer size simillar to SO_RCVBUF + // Haven't tested with a lower value to see if it's better or worse + byte[] buffer = new byte[bufferSize]; + try + { + InputStream in = socket.getInputStream(); + int read = 0; + int offset = 0; + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[bufferSize]; + } + } + } + } + catch (Throwable t) + { + receiver.exception(new TransportException("error in read thread", t)); + } + finally + { + try + { + transport.getSender().close(); + } + catch (TransportException e) + { + log.error(e, "error closing"); + } + finally + { + receiver.closed(); + } + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java index 511d5d5b9e..c358d3bd3b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java @@ -21,106 +21,243 @@ package org.apache.qpidity.transport.network.io; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpidity.transport.Sender; import org.apache.qpidity.transport.TransportException; +import org.apache.qpidity.transport.util.Logger; -public class IoSender implements Sender<java.nio.ByteBuffer> +import static org.apache.qpidity.transport.util.Functions.*; + + +final class IoSender extends Thread implements Sender<ByteBuffer> { - private final Object lock = new Object(); - private Socket _socket; - private OutputStream _outStream; - public IoSender(Socket socket) + private static final Logger log = Logger.get(IoSender.class); + + // by starting here, we ensure that we always test the wraparound + // case, we should probably make this configurable somehow so that + // we can test other cases as well + private final static int START = Integer.MAX_VALUE - 10; + + private final IoTransport transport; + private final long timeout; + private final Socket socket; + private final OutputStream out; + + private final byte[] buffer; + private final AtomicInteger head = new AtomicInteger(START); + private final AtomicInteger tail = new AtomicInteger(START); + private final Object notFull = new Object(); + private final Object notEmpty = new Object(); + private final AtomicBoolean closed = new AtomicBoolean(false); + + private IOException exception = null; + + + public IoSender(IoTransport transport, int bufferSize, long timeout) { - this._socket = socket; + this.transport = transport; + this.socket = transport.getSocket(); + this.buffer = new byte[bufferSize]; + this.timeout = timeout; + try { - _outStream = _socket.getOutputStream(); + out = socket.getOutputStream(); } - catch(IOException e) + catch (IOException e) { - throw new TransportException("Error getting output stream for socket",e); + throw new TransportException("Error getting output stream for socket", e); } - } - /* - * Currently I don't implement any in memory buffering - * and just write straight to the wire. - * I want to experiment with buffering and see if I can - * get more performance, all though latency will suffer - * a bit. - */ - public void send(java.nio.ByteBuffer buf) - { - write(buf); + setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + start(); } - public void flush() + private static final int mod(int n, int m) { - // pass + int r = n % m; + return r < 0 ? m + r : r; } - /* The extra copying sucks. - * If I know for sure that the buf is backed - * by an array then I could do buf.array() - */ - private void write(java.nio.ByteBuffer buf) + public void send(ByteBuffer buf) { - byte[] array = null; - - if (buf.hasArray()) + if (closed.get()) { - array = buf.array(); - } - else - { - array = new byte[buf.remaining()]; - buf.get(array); + throw new TransportException("sender is closed", exception); } - if( _socket.isConnected()) + final int size = buffer.length; + int remaining = buf.remaining(); + + while (remaining > 0) { - synchronized (lock) + final int hd = head.get(); + final int tl = tail.get(); + + if (hd - tl >= size) { - try + synchronized (notFull) { - _outStream.write(array); + long start = System.currentTimeMillis(); + long elapsed = 0; + while (head.get() - tail.get() >= size && elapsed < timeout) + { + try + { + notFull.wait(timeout - elapsed); + } + catch (InterruptedException e) + { + // pass + } + elapsed += System.currentTimeMillis() - start; + } + + if (head.get() - tail.get() >= size) + { + throw new TransportException(String.format("write timed out: %s, %s", head.get(), tail.get())); + } } - catch(Exception e) + continue; + } + + final int hd_idx = mod(hd, size); + final int tl_idx = mod(tl, size); + final int length; + + if (tl_idx > hd_idx) + { + length = Math.min(tl_idx - hd_idx, remaining); + } + else + { + length = Math.min(size - hd_idx, remaining); + } + + buf.get(buffer, hd_idx, length); + head.getAndAdd(length); + if (hd == tail.get()) + { + synchronized (notEmpty) { - throw new TransportException("Error trying to write to the socket",e); + notEmpty.notify(); } } - } - else - { - throw new TransportException("Trying to write on a closed socket"); + remaining -= length; } } - /* - * Haven't used this, but the intention is - * to experiment with it in the future. - * Also need to make sure the buffer size - * is configurable - */ - public void setStartBatching() + public void flush() { + // pass } public void close() { - synchronized (lock) + if (closed.getAndSet(true)) + { + synchronized (notEmpty) + { + notEmpty.notify(); + } + + try + { + join(timeout); + if (isAlive()) + { + throw new TransportException("join timed out"); + } + socket.close(); + } + catch (InterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + + if (exception != null) + { + throw new TransportException(exception); + } + } + } + + public void run() + { + final int size = buffer.length; + + while (true) { + final int hd = head.get(); + final int tl = tail.get(); + + if (hd == tl) + { + if (closed.get()) + { + break; + } + + synchronized (notEmpty) + { + while (head.get() == tail.get() && !closed.get()) + { + try + { + notEmpty.wait(); + } + catch (InterruptedException e) + { + // pass + } + } + } + + continue; + } + + final int hd_idx = mod(hd, size); + final int tl_idx = mod(tl, size); + + final int length; + if (tl_idx < hd_idx) + { + length = hd_idx - tl_idx; + } + else + { + length = size - tl_idx; + } + try { - _socket.close(); + out.write(buffer, tl_idx, length); } - catch(Exception e) + catch (IOException e) { - e.printStackTrace(); + log.error(e, "error in read thread"); + exception = e; + closed.set(true); + break; + } + tail.getAndAdd(length); + if (head.get() - tl >= size) + { + synchronized (notFull) + { + notFull.notify(); + } } } } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java new file mode 100644 index 0000000000..07a2c70965 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpidity.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; + +import org.apache.qpidity.transport.Connection; +import org.apache.qpidity.transport.ConnectionDelegate; +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.TransportException; +import org.apache.qpidity.transport.network.Assembler; +import org.apache.qpidity.transport.network.Disassembler; +import org.apache.qpidity.transport.network.InputHandler; +import org.apache.qpidity.transport.network.OutputHandler; +import org.apache.qpidity.transport.util.Logger; + +/** + * This class provides a socket based transport using the java.io + * classes. + * + * The following params are configurable via JVM arguments + * TCP_NO_DELAY - amqj.tcpNoDelay + * SO_RCVBUF - amqj.receiveBufferSize + * SO_SNDBUF - amqj.sendBufferSize + */ +public final class IoTransport +{ + + private static final Logger log = Logger.get(IoTransport.class); + + private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; + + private IoReceiver receiver; + private IoSender sender; + private Socket socket; + private int readBufferSize; + private int writeBufferSize; + private final long timeout = 60000; + + private IoTransport() + { + readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); + writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); + } + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + IoTransport handler = new IoTransport(); + return handler.connectInternal(host,port,delegate); + } + + private Connection connectInternal(String host, int port, + ConnectionDelegate delegate) + { + try + { + InetAddress address = InetAddress.getByName(host); + socket = new Socket(); + socket.setReuseAddress(true); + socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + + log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.setSendBufferSize(writeBufferSize); + socket.setReceiveBufferSize(readBufferSize); + + log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); + log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + + socket.connect(new InetSocketAddress(address, port)); + } + catch (SocketException e) + { + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + + sender = new IoSender(this, 2*writeBufferSize, timeout); + Connection conn = new Connection + (new Disassembler(new OutputHandler(sender), 64*1024 - 1), + delegate); + receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 2*readBufferSize); + + return conn; + } + + IoSender getSender() + { + return sender; + } + + IoReceiver getReceiver() + { + return receiver; + } + + Socket getSocket() + { + return socket; + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java index 281b618408..c604aaf3e4 100644 --- a/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java @@ -24,6 +24,7 @@ import org.apache.mina.util.AvailablePortFinder; import org.apache.qpid.util.concurrent.Condition; +import org.apache.qpidity.transport.network.io.IoTransport; import org.apache.qpidity.transport.network.mina.MinaHandler; import org.apache.qpidity.transport.util.Logger; @@ -67,7 +68,7 @@ public class ConnectionTest extends TestCase private Connection connect(final Condition closed) { - Connection conn = MinaHandler.connect("localhost", port, new ConnectionDelegate() + Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate() { public SessionDelegate getSessionDelegate() { |