diff options
Diffstat (limited to 'java')
3 files changed, 20 insertions, 13 deletions
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java index ec035bd60d..6940f91e22 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.amqp_1_0.framing; +import java.nio.ByteBuffer; + import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import java.nio.ByteBuffer; - public class AMQPProtocolHeaderHandler implements ProtocolHandler { private ConnectionEndpoint _connection; private static final byte MAJOR_VERSION = (byte) 1; private static final byte MINOR_VERSION = (byte) 0; + private boolean _done; enum State { AWAITING_MAJOR, @@ -53,13 +54,13 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler { case AWAITING_MAJOR: _state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { break; } case AWAITING_MINOR: _state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { break; } @@ -67,11 +68,13 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler byte revision = in.get(); _connection.protocolHeaderReceived(MAJOR_VERSION, MINOR_VERSION, revision); ProtocolHandler handler = new FrameHandler(_connection); + _done = true; return handler.parse(in); } } if(_state == State.ERROR) { + _done = true; _connection.invalidHeaderReceived(); } return this; @@ -80,6 +83,6 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler public boolean isDone() { - return _state != State.ERROR && !_connection.closedForInput(); + return _done || _connection.closedForInput(); } } diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java index 4ab90df92a..21a2766f17 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java @@ -19,16 +19,17 @@ package org.apache.qpid.amqp_1_0.framing; +import java.nio.ByteBuffer; + import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import java.nio.ByteBuffer; - public class SASLProtocolHeaderHandler implements ProtocolHandler { private ConnectionEndpoint _connection; private static final byte MAJOR_VERSION = (byte) 1; private static final byte MINOR_VERSION = (byte) 0; + private boolean _done; enum State { AWAITING_MAJOR, @@ -54,26 +55,30 @@ public class SASLProtocolHeaderHandler implements ProtocolHandler { case AWAITING_MAJOR: _state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { + _done = true; break; } case AWAITING_MINOR: _state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { + _done = true; break; } case AWAITING_REVISION: byte revision = in.get(); _connection.protocolHeaderReceived(MAJOR_VERSION, MINOR_VERSION, revision); ProtocolHandler handler = new SASLFrameHandler(_connection); + _done = true; return handler.parse(in); } } if(_state == State.ERROR) { _connection.invalidHeaderReceived(); + } return this; @@ -81,6 +86,6 @@ public class SASLProtocolHeaderHandler implements ProtocolHandler public boolean isDone() { - return _state != State.ERROR && !_connection.closedForInput(); + return _done || _connection.closedForInput(); } } diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 873f4e8f53..b48cdbe201 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -181,7 +181,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { synchronized (getLock()) { - while (!_saslComplete) + while (!(_saslComplete || _closedForInput)) { try { @@ -711,8 +711,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour public void invalidHeaderReceived() { - // TODO - _closedForInput = true; + setClosedForInput(true); } public synchronized boolean closedForInput() |