summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-07-22 18:33:00 +0000
committerRafael H. Schloming <rhs@apache.org>2008-07-22 18:33:00 +0000
commitda0e38aee411e9bcfa6d0c73811c41e11efc64bf (patch)
treef4862ace3963a310ef5e79e7c0d82409e242aa40
parentcfed390d5d3a7a9d6f76c1bd25f009b630317bdc (diff)
downloadqpid-python-da0e38aee411e9bcfa6d0c73811c41e11efc64bf.tar.gz
Updated the io transport to use a separate write thread with a circular buffer that does opportunistic write batching. Fixed error handling and shutdown for the io transport. Switched default from mina to the io transport for the 0-10 client. Modified InputHandler to accumulate bytes in the outer loop and simplified the state machine accordingly. These changes should address QPID-1188, prevent the Java client from running out of memory when writing messages faster than the network and/or broker can keep up, and in general improve performance.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@678848 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java23
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java247
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java170
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java106
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java251
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java129
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java3
9 files changed, 536 insertions, 414 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 472eaef5b5..b3b3cc1ffd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -59,6 +59,7 @@ import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import java.io.IOException;
import java.net.ConnectException;
+import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.*;
@@ -467,6 +468,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (connectionException.getCause() != null)
{
message = connectionException.getCause().getMessage();
+ connectionException.getCause().printStackTrace();
}
else
{
@@ -486,18 +488,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- AMQException e = new AMQConnectionFailureException(message, connectionException);
-
- if (connectionException != null)
+ for (Throwable th = connectionException; th != null; th = th.getCause())
{
- if (connectionException instanceof UnresolvedAddressException)
+ if (th instanceof UnresolvedAddressException ||
+ th instanceof UnknownHostException)
{
- e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
- null);
+ throw new AMQUnresolvedAddressException
+ (message,
+ _failoverPolicy.getCurrentBrokerDetails().toString(),
+ connectionException);
}
-
}
- throw e;
+
+ throw new AMQConnectionFailureException(message, connectionException);
}
_connectionMetaData = new QpidConnectionMetaData(this);
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index 7de7c71d36..f8d5bbcb1c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -43,7 +43,7 @@ import org.apache.qpidity.transport.ConnectionCloseOk;
import org.apache.qpidity.transport.TransportConstants;
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.SessionDelegate;
-import org.apache.qpidity.transport.network.io.IoHandler;
+import org.apache.qpidity.transport.network.io.IoTransport;
import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.transport.network.nio.NioHandler;
import org.slf4j.Logger;
@@ -167,15 +167,16 @@ public class Client implements org.apache.qpidity.nclient.Connection
connectionDelegate.setPassword(password);
connectionDelegate.setVirtualHost(virtualHost);
- if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
+ String transport = System.getProperty("transport","io");
+ if (transport.equalsIgnoreCase("nio"))
{
_logger.info("using NIO Transport");
_conn = NioHandler.connect(host, port,connectionDelegate);
}
- else if (System.getProperty("transport","mina").equalsIgnoreCase("io"))
+ else if (transport.equalsIgnoreCase("io"))
{
_logger.info("using Plain IO Transport");
- _conn = IoHandler.connect(host, port,connectionDelegate);
+ _conn = IoTransport.connect(host, port,connectionDelegate);
}
else
{
@@ -287,20 +288,6 @@ public class Client implements org.apache.qpidity.nclient.Connection
ssn.attach(ch);
ssn.sessionAttach(ssn.getName());
ssn.sessionRequestTimeout(expiryInSeconds);
- String transport = System.getProperty("transport","mina");
-
- try
- {
- if (Boolean.getBoolean("batch") && ("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport)))
- {
- _logger.debug("using batch mode in transport " + transport);
- IoHandler.startBatchingFrames(_conn.getConnectionId());
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
return ssn;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index ca572119d9..75dd9fc2ff 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -249,7 +249,7 @@ public class Session extends Invoker
}
}
- protected void invoke(Method m)
+ public void invoke(Method m)
{
if (m.getEncodedTrack() == Frame.L4)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
index 48f68e9020..8963f831da 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpidity.transport.network;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolHeader;
@@ -38,53 +39,41 @@ import static org.apache.qpidity.transport.network.InputHandler.State.*;
* @author Rafael H. Schloming
*/
-public class InputHandler implements Receiver<ByteBuffer>
+public final class InputHandler implements Receiver<ByteBuffer>
{
public enum State
{
PROTO_HDR,
- PROTO_HDR_M,
- PROTO_HDR_Q,
- PROTO_HDR_P,
- PROTO_HDR_CLASS,
- PROTO_HDR_INSTANCE,
- PROTO_HDR_MAJOR,
- PROTO_HDR_MINOR,
FRAME_HDR,
- FRAME_HDR_TYPE,
- FRAME_HDR_SIZE1,
- FRAME_HDR_SIZE2,
- FRAME_HDR_RSVD1,
- FRAME_HDR_TRACK,
- FRAME_HDR_CH1,
- FRAME_HDR_CH2,
- FRAME_HDR_RSVD2,
- FRAME_HDR_RSVD3,
- FRAME_HDR_RSVD4,
- FRAME_HDR_RSVD5,
- FRAME_FRAGMENT,
+ FRAME_BODY,
ERROR;
}
private final Receiver<NetworkEvent> receiver;
private State state;
-
- private byte instance;
- private byte major;
- private byte minor;
+ private ByteBuffer input = null;
+ private int needed;
private byte flags;
private SegmentType type;
private byte track;
private int channel;
- private int size;
- private ByteBuffer body;
public InputHandler(Receiver<NetworkEvent> receiver, State state)
{
this.receiver = receiver;
this.state = state;
+
+ switch (state)
+ {
+ case PROTO_HDR:
+ needed = 8;
+ break;
+ case FRAME_HDR:
+ needed = Frame.HEADER_SIZE;
+ break;
+ }
}
public InputHandler(Receiver<NetworkEvent> receiver)
@@ -92,18 +81,6 @@ public class InputHandler implements Receiver<ByteBuffer>
this(receiver, PROTO_HDR);
}
- private void init()
- {
- receiver.received(new ProtocolHeader(instance, major, minor));
- }
-
- private void frame()
- {
- Frame frame = new Frame(flags, type, track, channel, body);
- assert size == frame.getSize();
- receiver.received(frame);
- }
-
private void error(String fmt, Object ... args)
{
receiver.received(new ProtocolError(Frame.L1, fmt, args));
@@ -111,157 +88,109 @@ public class InputHandler implements Receiver<ByteBuffer>
public void received(ByteBuffer buf)
{
- while (buf.hasRemaining())
+ int limit = buf.limit();
+ int remaining = buf.remaining();
+ while (remaining > 0)
{
- state = next(buf);
+ if (remaining >= needed)
+ {
+ int consumed = needed;
+ int pos = buf.position();
+ if (input == null)
+ {
+ buf.limit(pos + needed);
+ input = buf;
+ state = next(pos);
+ buf.limit(limit);
+ buf.position(pos + consumed);
+ }
+ else
+ {
+ buf.limit(pos + needed);
+ input.put(buf);
+ buf.limit(limit);
+ input.flip();
+ state = next(0);
+ }
+
+ remaining -= consumed;
+ input = null;
+ }
+ else
+ {
+ if (input == null)
+ {
+ input = ByteBuffer.allocate(needed);
+ }
+ input.put(buf);
+ needed -= remaining;
+ remaining = 0;
+ }
}
}
- private State next(ByteBuffer buf)
+ private State next(int pos)
{
+ input.order(ByteOrder.BIG_ENDIAN);
+
switch (state) {
case PROTO_HDR:
- return expect(buf, 'A', PROTO_HDR_M);
- case PROTO_HDR_M:
- return expect(buf, 'M', PROTO_HDR_Q);
- case PROTO_HDR_Q:
- return expect(buf, 'Q', PROTO_HDR_P);
- case PROTO_HDR_P:
- return expect(buf, 'P', PROTO_HDR_CLASS);
- case PROTO_HDR_CLASS:
- return expect(buf, 1, PROTO_HDR_INSTANCE);
- case PROTO_HDR_INSTANCE:
- instance = buf.get();
- return PROTO_HDR_MAJOR;
- case PROTO_HDR_MAJOR:
- major = buf.get();
- return PROTO_HDR_MINOR;
- case PROTO_HDR_MINOR:
- minor = buf.get();
- init();
+ if (input.get(pos) != 'A' &&
+ input.get(pos + 1) != 'M' &&
+ input.get(pos + 2) != 'Q' &&
+ input.get(pos + 3) != 'P')
+ {
+ error("bad protocol header: %s", str(input));
+ return ERROR;
+ }
+
+ byte instance = input.get(pos + 5);
+ byte major = input.get(pos + 6);
+ byte minor = input.get(pos + 7);
+ receiver.received(new ProtocolHeader(instance, major, minor));
+ needed = Frame.HEADER_SIZE;
return FRAME_HDR;
case FRAME_HDR:
- flags = buf.get();
- return FRAME_HDR_TYPE;
- case FRAME_HDR_TYPE:
- type = SegmentType.get(buf.get());
- return FRAME_HDR_SIZE1;
- case FRAME_HDR_SIZE1:
- size = (0xFF & buf.get()) << 8;
- return FRAME_HDR_SIZE2;
- case FRAME_HDR_SIZE2:
- size += 0xFF & buf.get();
- size -= 12;
+ flags = input.get(pos);
+ type = SegmentType.get(input.get(pos + 1));
+ int size = (0xFFFF & input.getShort(pos + 2));
+ size -= Frame.HEADER_SIZE;
if (size < 0 || size > (64*1024 - 12))
{
error("bad frame size: %d", size);
return ERROR;
}
- else
- {
- return FRAME_HDR_RSVD1;
- }
- case FRAME_HDR_RSVD1:
- return expect(buf, 0, FRAME_HDR_TRACK);
- case FRAME_HDR_TRACK:
- byte b = buf.get();
+ byte b = input.get(pos + 5);
if ((b & 0xF0) != 0) {
error("non-zero reserved bits in upper nibble of " +
"frame header byte 5: '%x'", b);
return ERROR;
} else {
track = (byte) (b & 0xF);
- return FRAME_HDR_CH1;
}
- case FRAME_HDR_CH1:
- channel = (0xFF & buf.get()) << 8;
- return FRAME_HDR_CH2;
- case FRAME_HDR_CH2:
- channel += 0xFF & buf.get();
- return FRAME_HDR_RSVD2;
- case FRAME_HDR_RSVD2:
- return expect(buf, 0, FRAME_HDR_RSVD3);
- case FRAME_HDR_RSVD3:
- return expect(buf, 0, FRAME_HDR_RSVD4);
- case FRAME_HDR_RSVD4:
- return expect(buf, 0, FRAME_HDR_RSVD5);
- case FRAME_HDR_RSVD5:
- if (!expect(buf, 0))
+ channel = (0xFFFF & input.getShort(pos + 6));
+ if (size == 0)
{
- return ERROR;
- }
-
- if (size > buf.remaining()) {
- body = ByteBuffer.allocate(size);
- body.put(buf);
- return FRAME_FRAGMENT;
- } else {
- body = buf.slice();
- body.limit(size);
- buf.position(buf.position() + size);
- frame();
+ Frame frame = new Frame(flags, type, track, channel, ByteBuffer.allocate(0));
+ receiver.received(frame);
+ needed = Frame.HEADER_SIZE;
return FRAME_HDR;
}
- case FRAME_FRAGMENT:
- int delta = body.remaining();
- if (delta > buf.remaining()) {
- body.put(buf);
- return FRAME_FRAGMENT;
- } else {
- ByteBuffer fragment = buf.slice();
- fragment.limit(delta);
- buf.position(buf.position() + delta);
- body.put(fragment);
- body.flip();
- frame();
- return FRAME_HDR;
+ else
+ {
+ needed = size;
+ return FRAME_BODY;
}
+ case FRAME_BODY:
+ Frame frame = new Frame(flags, type, track, channel, input.slice());
+ receiver.received(frame);
+ needed = Frame.HEADER_SIZE;
+ return FRAME_HDR;
default:
throw new IllegalStateException();
}
}
- private State expect(ByteBuffer buf, int expected, State next)
- {
- return expect(buf, (byte) expected, next);
- }
-
- private State expect(ByteBuffer buf, char expected, State next)
- {
- return expect(buf, (byte) expected, next);
- }
-
- private State expect(ByteBuffer buf, byte expected, State next)
- {
- if (expect(buf, expected))
- {
- return next;
- }
- else
- {
- return ERROR;
- }
- }
-
- private boolean expect(ByteBuffer buf, int expected)
- {
- return expect(buf, (byte) expected);
- }
-
- private boolean expect(ByteBuffer buf, byte expected)
- {
- byte b = buf.get();
- if (b == expected)
- {
- return true;
- }
- else
- {
- error("expecting '%x', got '%x'", expected, b);
- return false;
- }
- }
-
public void exception(Throwable t)
{
receiver.exception(t);
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
deleted file mode 100644
index 82fd03857a..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpidity.transport.network.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.qpidity.transport.Connection;
-import org.apache.qpidity.transport.ConnectionDelegate;
-import org.apache.qpidity.transport.Receiver;
-import org.apache.qpidity.transport.TransportException;
-import org.apache.qpidity.transport.network.Assembler;
-import org.apache.qpidity.transport.network.Disassembler;
-import org.apache.qpidity.transport.network.InputHandler;
-import org.apache.qpidity.transport.network.OutputHandler;
-import org.apache.qpidity.transport.util.Logger;
-
-/**
- * This class provides a synchronous IO implementation using
- * the java.io classes. The IoHandler runs in its own thread.
- * The following params are configurable via JVM arguments
- * TCP_NO_DELAY - amqj.tcpNoDelay
- * SO_RCVBUF - amqj.receiveBufferSize
- * SO_SNDBUF - amqj.sendBufferSize
- */
-public class IoHandler implements Runnable
-{
- private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
-
- private Receiver<ByteBuffer> _receiver;
- private Socket _socket;
- private byte[] _readBuf;
- private static AtomicInteger _count = new AtomicInteger();
- private int _readBufferSize;
- private int _writeBufferSize;
-
- private static final Logger log = Logger.get(IoHandler.class);
-
- private IoHandler()
- {
- _readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
- _writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
- }
-
- public static final Connection connect(String host, int port,
- ConnectionDelegate delegate)
- {
- IoHandler handler = new IoHandler();
- return handler.connectInternal(host,port,delegate);
- }
-
- private Connection connectInternal(String host, int port,
- ConnectionDelegate delegate)
- {
- try
- {
- InetAddress address = InetAddress.getByName(host);
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
-
- log.debug("default-SO_RCVBUF : " + _socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : " + _socket.getSendBufferSize());
-
- _socket.setSendBufferSize(_writeBufferSize);
- _socket.setReceiveBufferSize(_readBufferSize);
-
- log.debug("new-SO_RCVBUF : " + _socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : " + _socket.getSendBufferSize());
-
- if (address != null)
- {
- _socket.connect(new InetSocketAddress(address, port));
- }
- }
- catch (SocketException e)
- {
- throw new TransportException("Error connecting to broker",e);
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker",e);
- }
-
- IoSender sender = new IoSender(_socket);
- Connection con = new Connection
- (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
- delegate);
-
- con.setConnectionId(_count.incrementAndGet());
- _receiver = new InputHandler(new Assembler(con), InputHandler.State.PROTO_HDR);
-
- Thread t = new Thread(this);
- t.setName("IO Handler Thread-" + _count.get());
- t.start();
-
- return con;
- }
-
- public void run()
- {
- // I set the read_buffer size simillar to SO_RCVBUF
- // Haven't tested with a lower value to see its better or worse
- _readBuf = new byte[_readBufferSize];
- try
- {
- InputStream in = _socket.getInputStream();
- int read = 0;
- while(read != -1)
- {
- read = in.read(_readBuf);
- if (read > 0)
- {
- ByteBuffer b = ByteBuffer.allocate(read);
- b.put(_readBuf,0,read);
- b.flip();
- _receiver.received(b);
- }
- }
- }
- catch (IOException e)
- {
- _receiver.exception(new Exception("Error getting input stream from the socket",e));
- }
- finally
- {
- try
- {
- _socket.close();
- }
- catch(Exception e)
- {
- log.error(e,"Error closing socket");
- }
- }
- }
-
- /**
- * Will experiment in a future version with batching
- */
- public static void startBatchingFrames(int connectionId)
- {
-
- }
-
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java
new file mode 100644
index 0000000000..426c41dd55
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network.io;
+
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.TransportException;
+import org.apache.qpidity.transport.util.Logger;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+/**
+ * IoReceiver
+ *
+ */
+
+final class IoReceiver extends Thread
+{
+
+ private static final Logger log = Logger.get(IoReceiver.class);
+
+ private final IoTransport transport;
+ private final Receiver<ByteBuffer> receiver;
+ private final int bufferSize;
+ private final Socket socket;
+
+ public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize)
+ {
+ this.transport = transport;
+ this.receiver = receiver;
+ this.bufferSize = bufferSize;
+ this.socket = transport.getSocket();
+
+ setName(String.format("IoReceive - %s", socket.getRemoteSocketAddress()));
+ start();
+ }
+
+ public void run()
+ {
+ final int threshold = bufferSize / 2;
+
+ // I set the read buffer size simillar to SO_RCVBUF
+ // Haven't tested with a lower value to see if it's better or worse
+ byte[] buffer = new byte[bufferSize];
+ try
+ {
+ InputStream in = socket.getInputStream();
+ int read = 0;
+ int offset = 0;
+ while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ {
+ if (read > 0)
+ {
+ ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+ receiver.received(b);
+ offset+=read;
+ if (offset > threshold)
+ {
+ offset = 0;
+ buffer = new byte[bufferSize];
+ }
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ receiver.exception(new TransportException("error in read thread", t));
+ }
+ finally
+ {
+ try
+ {
+ transport.getSender().close();
+ }
+ catch (TransportException e)
+ {
+ log.error(e, "error closing");
+ }
+ finally
+ {
+ receiver.closed();
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
index 511d5d5b9e..c358d3bd3b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
@@ -21,106 +21,243 @@ package org.apache.qpidity.transport.network.io;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpidity.transport.Sender;
import org.apache.qpidity.transport.TransportException;
+import org.apache.qpidity.transport.util.Logger;
-public class IoSender implements Sender<java.nio.ByteBuffer>
+import static org.apache.qpidity.transport.util.Functions.*;
+
+
+final class IoSender extends Thread implements Sender<ByteBuffer>
{
- private final Object lock = new Object();
- private Socket _socket;
- private OutputStream _outStream;
- public IoSender(Socket socket)
+ private static final Logger log = Logger.get(IoSender.class);
+
+ // by starting here, we ensure that we always test the wraparound
+ // case, we should probably make this configurable somehow so that
+ // we can test other cases as well
+ private final static int START = Integer.MAX_VALUE - 10;
+
+ private final IoTransport transport;
+ private final long timeout;
+ private final Socket socket;
+ private final OutputStream out;
+
+ private final byte[] buffer;
+ private final AtomicInteger head = new AtomicInteger(START);
+ private final AtomicInteger tail = new AtomicInteger(START);
+ private final Object notFull = new Object();
+ private final Object notEmpty = new Object();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private IOException exception = null;
+
+
+ public IoSender(IoTransport transport, int bufferSize, long timeout)
{
- this._socket = socket;
+ this.transport = transport;
+ this.socket = transport.getSocket();
+ this.buffer = new byte[bufferSize];
+ this.timeout = timeout;
+
try
{
- _outStream = _socket.getOutputStream();
+ out = socket.getOutputStream();
}
- catch(IOException e)
+ catch (IOException e)
{
- throw new TransportException("Error getting output stream for socket",e);
+ throw new TransportException("Error getting output stream for socket", e);
}
- }
- /*
- * Currently I don't implement any in memory buffering
- * and just write straight to the wire.
- * I want to experiment with buffering and see if I can
- * get more performance, all though latency will suffer
- * a bit.
- */
- public void send(java.nio.ByteBuffer buf)
- {
- write(buf);
+ setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ start();
}
- public void flush()
+ private static final int mod(int n, int m)
{
- // pass
+ int r = n % m;
+ return r < 0 ? m + r : r;
}
- /* The extra copying sucks.
- * If I know for sure that the buf is backed
- * by an array then I could do buf.array()
- */
- private void write(java.nio.ByteBuffer buf)
+ public void send(ByteBuffer buf)
{
- byte[] array = null;
-
- if (buf.hasArray())
+ if (closed.get())
{
- array = buf.array();
- }
- else
- {
- array = new byte[buf.remaining()];
- buf.get(array);
+ throw new TransportException("sender is closed", exception);
}
- if( _socket.isConnected())
+ final int size = buffer.length;
+ int remaining = buf.remaining();
+
+ while (remaining > 0)
{
- synchronized (lock)
+ final int hd = head.get();
+ final int tl = tail.get();
+
+ if (hd - tl >= size)
{
- try
+ synchronized (notFull)
{
- _outStream.write(array);
+ long start = System.currentTimeMillis();
+ long elapsed = 0;
+ while (head.get() - tail.get() >= size && elapsed < timeout)
+ {
+ try
+ {
+ notFull.wait(timeout - elapsed);
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ elapsed += System.currentTimeMillis() - start;
+ }
+
+ if (head.get() - tail.get() >= size)
+ {
+ throw new TransportException(String.format("write timed out: %s, %s", head.get(), tail.get()));
+ }
}
- catch(Exception e)
+ continue;
+ }
+
+ final int hd_idx = mod(hd, size);
+ final int tl_idx = mod(tl, size);
+ final int length;
+
+ if (tl_idx > hd_idx)
+ {
+ length = Math.min(tl_idx - hd_idx, remaining);
+ }
+ else
+ {
+ length = Math.min(size - hd_idx, remaining);
+ }
+
+ buf.get(buffer, hd_idx, length);
+ head.getAndAdd(length);
+ if (hd == tail.get())
+ {
+ synchronized (notEmpty)
{
- throw new TransportException("Error trying to write to the socket",e);
+ notEmpty.notify();
}
}
- }
- else
- {
- throw new TransportException("Trying to write on a closed socket");
+ remaining -= length;
}
}
- /*
- * Haven't used this, but the intention is
- * to experiment with it in the future.
- * Also need to make sure the buffer size
- * is configurable
- */
- public void setStartBatching()
+ public void flush()
{
+ // pass
}
public void close()
{
- synchronized (lock)
+ if (closed.getAndSet(true))
+ {
+ synchronized (notEmpty)
+ {
+ notEmpty.notify();
+ }
+
+ try
+ {
+ join(timeout);
+ if (isAlive())
+ {
+ throw new TransportException("join timed out");
+ }
+ socket.close();
+ }
+ catch (InterruptedException e)
+ {
+ throw new TransportException(e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+
+ if (exception != null)
+ {
+ throw new TransportException(exception);
+ }
+ }
+ }
+
+ public void run()
+ {
+ final int size = buffer.length;
+
+ while (true)
{
+ final int hd = head.get();
+ final int tl = tail.get();
+
+ if (hd == tl)
+ {
+ if (closed.get())
+ {
+ break;
+ }
+
+ synchronized (notEmpty)
+ {
+ while (head.get() == tail.get() && !closed.get())
+ {
+ try
+ {
+ notEmpty.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ }
+ }
+
+ continue;
+ }
+
+ final int hd_idx = mod(hd, size);
+ final int tl_idx = mod(tl, size);
+
+ final int length;
+ if (tl_idx < hd_idx)
+ {
+ length = hd_idx - tl_idx;
+ }
+ else
+ {
+ length = size - tl_idx;
+ }
+
try
{
- _socket.close();
+ out.write(buffer, tl_idx, length);
}
- catch(Exception e)
+ catch (IOException e)
{
- e.printStackTrace();
+ log.error(e, "error in read thread");
+ exception = e;
+ closed.set(true);
+ break;
+ }
+ tail.getAndAdd(length);
+ if (head.get() - tl >= size)
+ {
+ synchronized (notFull)
+ {
+ notFull.notify();
+ }
}
}
}
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java
new file mode 100644
index 0000000000..07a2c70965
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpidity.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.TransportException;
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+import org.apache.qpidity.transport.util.Logger;
+
+/**
+ * This class provides a socket based transport using the java.io
+ * classes.
+ *
+ * The following params are configurable via JVM arguments
+ * TCP_NO_DELAY - amqj.tcpNoDelay
+ * SO_RCVBUF - amqj.receiveBufferSize
+ * SO_SNDBUF - amqj.sendBufferSize
+ */
+public final class IoTransport
+{
+
+ private static final Logger log = Logger.get(IoTransport.class);
+
+ private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+
+ private IoReceiver receiver;
+ private IoSender sender;
+ private Socket socket;
+ private int readBufferSize;
+ private int writeBufferSize;
+ private final long timeout = 60000;
+
+ private IoTransport()
+ {
+ readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+ writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+ }
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ IoTransport handler = new IoTransport();
+ return handler.connectInternal(host,port,delegate);
+ }
+
+ private Connection connectInternal(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ try
+ {
+ InetAddress address = InetAddress.getByName(host);
+ socket = new Socket();
+ socket.setReuseAddress(true);
+ socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+
+ log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.setSendBufferSize(writeBufferSize);
+ socket.setReceiveBufferSize(readBufferSize);
+
+ log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+ log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+ socket.connect(new InetSocketAddress(address, port));
+ }
+ catch (SocketException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+
+ sender = new IoSender(this, 2*writeBufferSize, timeout);
+ Connection conn = new Connection
+ (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+ delegate);
+ receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 2*readBufferSize);
+
+ return conn;
+ }
+
+ IoSender getSender()
+ {
+ return sender;
+ }
+
+ IoReceiver getReceiver()
+ {
+ return receiver;
+ }
+
+ Socket getSocket()
+ {
+ return socket;
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
index 281b618408..c604aaf3e4 100644
--- a/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
@@ -24,6 +24,7 @@ import org.apache.mina.util.AvailablePortFinder;
import org.apache.qpid.util.concurrent.Condition;
+import org.apache.qpidity.transport.network.io.IoTransport;
import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.transport.util.Logger;
@@ -67,7 +68,7 @@ public class ConnectionTest extends TestCase
private Connection connect(final Condition closed)
{
- Connection conn = MinaHandler.connect("localhost", port, new ConnectionDelegate()
+ Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate()
{
public SessionDelegate getSessionDelegate()
{