diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-22 01:00:18 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-22 01:00:18 +0000 |
commit | 802a192c035d7e97750ed7c3185543911b6d7756 (patch) | |
tree | 8fa99579207b02493608739fda59d0d676fbe7d5 /java | |
parent | 1ce8b46089c9a92f8d4ed3bc639fab5d8bc2ba3f (diff) | |
download | qpid-python-802a192c035d7e97750ed7c3185543911b6d7756.tar.gz |
QPID-5400 : stop waiting for SASL completion if the connection is closed for input
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1612441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 20 insertions, 13 deletions
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java index ec035bd60d..6940f91e22 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.amqp_1_0.framing; +import java.nio.ByteBuffer; + import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import java.nio.ByteBuffer; - public class AMQPProtocolHeaderHandler implements ProtocolHandler { private ConnectionEndpoint _connection; private static final byte MAJOR_VERSION = (byte) 1; private static final byte MINOR_VERSION = (byte) 0; + private boolean _done; enum State { AWAITING_MAJOR, @@ -53,13 +54,13 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler { case AWAITING_MAJOR: _state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { break; } case AWAITING_MINOR: _state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { break; } @@ -67,11 +68,13 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler byte revision = in.get(); _connection.protocolHeaderReceived(MAJOR_VERSION, MINOR_VERSION, revision); ProtocolHandler handler = new FrameHandler(_connection); + _done = true; return handler.parse(in); } } if(_state == State.ERROR) { + _done = true; _connection.invalidHeaderReceived(); } return this; @@ -80,6 +83,6 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler public boolean isDone() { - return _state != State.ERROR && !_connection.closedForInput(); + return _done || _connection.closedForInput(); } } diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java index 4ab90df92a..21a2766f17 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java @@ -19,16 +19,17 @@ package org.apache.qpid.amqp_1_0.framing; +import java.nio.ByteBuffer; + import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import java.nio.ByteBuffer; - public class SASLProtocolHeaderHandler implements ProtocolHandler { private ConnectionEndpoint _connection; private static final byte MAJOR_VERSION = (byte) 1; private static final byte MINOR_VERSION = (byte) 0; + private boolean _done; enum State { AWAITING_MAJOR, @@ -54,26 +55,30 @@ public class SASLProtocolHeaderHandler implements ProtocolHandler { case AWAITING_MAJOR: _state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { + _done = true; break; } case AWAITING_MINOR: _state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR : State.ERROR; - if(!in.hasRemaining()) + if(_state == State.ERROR || !in.hasRemaining()) { + _done = true; break; } case AWAITING_REVISION: byte revision = in.get(); _connection.protocolHeaderReceived(MAJOR_VERSION, MINOR_VERSION, revision); ProtocolHandler handler = new SASLFrameHandler(_connection); + _done = true; return handler.parse(in); } } if(_state == State.ERROR) { _connection.invalidHeaderReceived(); + } return this; @@ -81,6 +86,6 @@ public class SASLProtocolHeaderHandler implements ProtocolHandler public boolean isDone() { - return _state != State.ERROR && !_connection.closedForInput(); + return _done || _connection.closedForInput(); } } diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 873f4e8f53..b48cdbe201 100644 --- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -181,7 +181,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { synchronized (getLock()) { - while (!_saslComplete) + while (!(_saslComplete || _closedForInput)) { try { @@ -711,8 +711,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour public void invalidHeaderReceived() { - // TODO - _closedForInput = true; + setClosedForInput(true); } public synchronized boolean closedForInput() |