From 475ae3ef495c9c77efe4da7c4806e95d8fa20ce5 Mon Sep 17 00:00:00 2001 From: David Mollitor Date: Mon, 5 Oct 2020 20:14:43 -0400 Subject: Changed Read/Write ByteBuffer to more closely match ReadableByteChannel/WriteableByteChannel --- .../org/apache/thrift/async/TAsyncMethodCall.java | 16 +++++++-------- .../thrift/server/AbstractNonblockingServer.java | 8 ++++---- .../thrift/transport/TNonblockingSocket.java | 23 ++++++++++++--------- .../thrift/transport/TNonblockingTransport.java | 4 ---- .../org/apache/thrift/transport/TTransport.java | 24 +++++++++++++++++++++- .../apache/thrift/transport/sasl/FrameWriter.java | 3 ++- .../transport/sasl/NonblockingSaslHandler.java | 9 ++++---- 7 files changed, 54 insertions(+), 33 deletions(-) diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index d5c608d87..a4e51cd36 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -217,9 +217,9 @@ public abstract class TAsyncMethodCall { state = State.ERROR; } - private void doReadingResponseBody(SelectionKey key) throws IOException { + private void doReadingResponseBody(SelectionKey key) throws TTransportException { if (transport.read(frameBuffer) < 0) { - throw new IOException("Read call frame failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed"); } if (frameBuffer.remaining() == 0) { cleanUpAndFireCallback(key); @@ -241,9 +241,9 @@ public abstract class TAsyncMethodCall { } } - private void doReadingResponseSize() throws IOException { + private void doReadingResponseSize() throws TTransportException { if (transport.read(sizeBuffer) < 0) { - throw new IOException("Read call frame size failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed"); } if (sizeBuffer.remaining() == 0) { state = State.READING_RESPONSE_BODY; @@ -251,9 +251,9 @@ public abstract class TAsyncMethodCall { } } - private void doWritingRequestBody(SelectionKey key) throws IOException { + private void doWritingRequestBody(SelectionKey key) throws TTransportException { if (transport.write(frameBuffer) < 0) { - throw new IOException("Write call frame failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed"); } if (frameBuffer.remaining() == 0) { if (isOneway) { @@ -266,9 +266,9 @@ public abstract class TAsyncMethodCall { } } - private void doWritingRequestSize() throws IOException { + private void doWritingRequestSize() throws TTransportException { if (transport.write(sizeBuffer) < 0) { - throw new IOException("Write call frame size failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed"); } if (sizeBuffer.remaining() == 0) { state = State.WRITING_REQUEST_BODY; diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index 4aae803f2..f91e8254f 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -414,8 +414,8 @@ public abstract class AbstractNonblockingServer extends TServer { if (trans_.write(buffer_) < 0) { return false; } - } catch (IOException e) { - LOGGER.warn("Got an IOException during write!", e); + } catch (TTransportException e) { + LOGGER.warn("Got an Exception during write", e); return false; } @@ -543,8 +543,8 @@ public abstract class AbstractNonblockingServer extends TServer { private boolean internalRead() { try { return trans_.read(buffer_) >= 0; - } catch (IOException e) { - LOGGER.warn("Got an IOException in internalRead!", e); + } catch (TTransportException e) { + LOGGER.warn("Got an Exception in internalRead", e); return false; } } diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java index 76ed02cbb..13c858648 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java @@ -144,11 +144,14 @@ public class TNonblockingSocket extends TNonblockingTransport { /** * Perform a nonblocking read into buffer. */ - public int read(ByteBuffer buffer) throws IOException { - return socketChannel_.read(buffer); + public int read(ByteBuffer buffer) throws TTransportException { + try { + return socketChannel_.read(buffer); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } } - /** * Reads from the underlying input stream if not null. */ @@ -167,8 +170,12 @@ public class TNonblockingSocket extends TNonblockingTransport { /** * Perform a nonblocking write of the data in buffer; */ - public int write(ByteBuffer buffer) throws IOException { - return socketChannel_.write(buffer); + public int write(ByteBuffer buffer) throws TTransportException { + try { + return socketChannel_.write(buffer); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } } /** @@ -179,11 +186,7 @@ public class TNonblockingSocket extends TNonblockingTransport { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to write-only socket channel"); } - try { - socketChannel_.write(ByteBuffer.wrap(buf, off, len)); - } catch (IOException iox) { - throw new TTransportException(TTransportException.UNKNOWN, iox); - } + write(ByteBuffer.wrap(buf, off, len)); } /** diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java index 255595d6c..30ec9d25c 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java @@ -23,7 +23,6 @@ import org.apache.thrift.TConfiguration; import java.io.IOException; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -47,7 +46,4 @@ public abstract class TNonblockingTransport extends TEndpointTransport { public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException; - public abstract int read(ByteBuffer buffer) throws IOException; - - public abstract int write(ByteBuffer buffer) throws IOException; } diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java index 3811acd7e..992ad1661 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TTransport.java @@ -60,6 +60,26 @@ public abstract class TTransport implements Closeable { */ public abstract void close(); + /** + * Reads a sequence of bytes from this channel into the given buffer. An + * attempt is made to read up to the number of bytes remaining in the buffer, + * that is, dst.remaining(), at the moment this method is invoked. Upon return + * the buffer's position will move forward the number of bytes read; its limit + * will not have changed. Subclasses are encouraged to provide a more + * efficient implementation of this method. + * + * @param dst The buffer into which bytes are to be transferred + * @return The number of bytes read, possibly zero, or -1 if the channel has + * reached end-of-stream + * @throws TTransportException if there was an error reading data + */ + public int read(ByteBuffer dst) throws TTransportException { + byte[] arr = new byte[dst.remaining()]; + this.readAll(arr, 0, arr.length); + dst.put(arr); + return arr.length; + } + /** * Reads up to len bytes into buffer buf, starting at offset off. * @@ -129,12 +149,14 @@ public abstract class TTransport implements Closeable { * implementation of this method. * * @param src The buffer from which bytes are to be retrieved + * @return The number of bytes written, possibly zero * @throws TTransportException if there was an error writing data */ - public void write(ByteBuffer src) throws TTransportException { + public int write(ByteBuffer src) throws TTransportException { byte[] arr = new byte[src.remaining()]; src.get(arr); write(arr); + return arr.length; } /** diff --git a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java index e5feba01f..df1d93230 100644 --- a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java +++ b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; /** * Write frame (header and payload) to transport in a nonblocking way. @@ -101,7 +102,7 @@ public abstract class FrameWriter { * * @throws IOException */ - public void write(TNonblockingTransport transport) throws IOException { + public void write(TNonblockingTransport transport) throws TTransportException { transport.write(frameBytes); } diff --git a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java index 45571469b..d73c3ec18 100644 --- a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java +++ b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java @@ -19,7 +19,6 @@ package org.apache.thrift.transport.sasl; -import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.charset.StandardCharsets; @@ -364,7 +363,7 @@ public class NonblockingSaslHandler { saslChallenge.clear(); nextPhase = Phase.READING_SASL_RESPONSE; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } @@ -378,7 +377,7 @@ public class NonblockingSaslHandler { saslResponse = null; nextPhase = Phase.READING_REQUEST; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } @@ -389,7 +388,7 @@ public class NonblockingSaslHandler { if (saslChallenge.isComplete()) { nextPhase = Phase.CLOSING; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } @@ -401,7 +400,7 @@ public class NonblockingSaslHandler { responseWriter.clear(); nextPhase = Phase.READING_REQUEST; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } -- cgit v1.2.1