diff options
-rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java | 33 | ||||
-rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java | 22 |
2 files changed, 27 insertions, 28 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java index 568526d9db..82fd03857a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpidity.transport.Connection; import org.apache.qpidity.transport.ConnectionDelegate; import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.TransportException; import org.apache.qpidity.transport.network.Assembler; import org.apache.qpidity.transport.network.Disassembler; import org.apache.qpidity.transport.network.InputHandler; @@ -94,19 +95,14 @@ public class IoHandler implements Runnable { _socket.connect(new InetSocketAddress(address, port)); } - while (!_socket.isConnected()) - { - - } - } catch (SocketException e) { - throw new RuntimeException("Error connecting to broker",e); + throw new TransportException("Error connecting to broker",e); } catch (IOException e) { - throw new RuntimeException("Error connecting to broker",e); + throw new TransportException("Error connecting to broker",e); } IoSender sender = new IoSender(_socket); @@ -133,28 +129,21 @@ public class IoHandler implements Runnable { InputStream in = _socket.getInputStream(); int read = 0; - while(_socket.isConnected()) + while(read != -1) { - try - { - read = in.read(_readBuf); - if (read > 0) - { - ByteBuffer b = ByteBuffer.allocate(read); - b.put(_readBuf,0,read); - b.flip(); - _receiver.received(b); - } - } - catch(Exception e) + read = in.read(_readBuf); + if (read > 0) { - throw new RuntimeException("Error reading from socket input stream",e); + ByteBuffer b = ByteBuffer.allocate(read); + b.put(_readBuf,0,read); + b.flip(); + _receiver.received(b); } } } catch (IOException e) { - throw new RuntimeException("Error getting input stream from the socket",e); + _receiver.exception(new Exception("Error getting input stream from the socket",e)); } finally { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java index d50442be5c..511d5d5b9e 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import java.net.Socket; import org.apache.qpidity.transport.Sender; +import org.apache.qpidity.transport.TransportException; public class IoSender implements Sender<java.nio.ByteBuffer> { @@ -39,7 +40,7 @@ public class IoSender implements Sender<java.nio.ByteBuffer> } catch(IOException e) { - throw new RuntimeException("Error getting output stream for socket",e); + throw new TransportException("Error getting output stream for socket",e); } } @@ -66,8 +67,18 @@ public class IoSender implements Sender<java.nio.ByteBuffer> */ private void write(java.nio.ByteBuffer buf) { - byte[] array = new byte[buf.remaining()]; - buf.get(array); + byte[] array = null; + + if (buf.hasArray()) + { + array = buf.array(); + } + else + { + array = new byte[buf.remaining()]; + buf.get(array); + } + if( _socket.isConnected()) { synchronized (lock) @@ -78,14 +89,13 @@ public class IoSender implements Sender<java.nio.ByteBuffer> } catch(Exception e) { - e.fillInStackTrace(); - throw new RuntimeException("Error trying to write to the socket",e); + throw new TransportException("Error trying to write to the socket",e); } } } else { - throw new RuntimeException("Trying to write on a closed socket"); + throw new TransportException("Trying to write on a closed socket"); } } |