summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-22 01:00:18 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-22 01:00:18 +0000
commit802a192c035d7e97750ed7c3185543911b6d7756 (patch)
tree8fa99579207b02493608739fda59d0d676fbe7d5 /java
parent1ce8b46089c9a92f8d4ed3bc639fab5d8bc2ba3f (diff)
downloadqpid-python-802a192c035d7e97750ed7c3185543911b6d7756.tar.gz
QPID-5400 : stop waiting for SASL completion if the connection is closed for input
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1612441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java13
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java15
-rw-r--r--java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java5
3 files changed, 20 insertions, 13 deletions
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
index ec035bd60d..6940f91e22 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.amqp_1_0.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import java.nio.ByteBuffer;
-
public class AMQPProtocolHeaderHandler implements ProtocolHandler
{
private ConnectionEndpoint _connection;
private static final byte MAJOR_VERSION = (byte) 1;
private static final byte MINOR_VERSION = (byte) 0;
+ private boolean _done;
enum State {
AWAITING_MAJOR,
@@ -53,13 +54,13 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler
{
case AWAITING_MAJOR:
_state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR : State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
break;
}
case AWAITING_MINOR:
_state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR : State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
break;
}
@@ -67,11 +68,13 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler
byte revision = in.get();
_connection.protocolHeaderReceived(MAJOR_VERSION, MINOR_VERSION, revision);
ProtocolHandler handler = new FrameHandler(_connection);
+ _done = true;
return handler.parse(in);
}
}
if(_state == State.ERROR)
{
+ _done = true;
_connection.invalidHeaderReceived();
}
return this;
@@ -80,6 +83,6 @@ public class AMQPProtocolHeaderHandler implements ProtocolHandler
public boolean isDone()
{
- return _state != State.ERROR && !_connection.closedForInput();
+ return _done || _connection.closedForInput();
}
}
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
index 4ab90df92a..21a2766f17 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
@@ -19,16 +19,17 @@
package org.apache.qpid.amqp_1_0.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import java.nio.ByteBuffer;
-
public class SASLProtocolHeaderHandler implements ProtocolHandler
{
private ConnectionEndpoint _connection;
private static final byte MAJOR_VERSION = (byte) 1;
private static final byte MINOR_VERSION = (byte) 0;
+ private boolean _done;
enum State {
AWAITING_MAJOR,
@@ -54,26 +55,30 @@ public class SASLProtocolHeaderHandler implements ProtocolHandler
{
case AWAITING_MAJOR:
_state = in.get() == MAJOR_VERSION ? State.AWAITING_MINOR : State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
+ _done = true;
break;
}
case AWAITING_MINOR:
_state = in.get() == MINOR_VERSION ? State.AWAITING_MINOR : State.ERROR;
- if(!in.hasRemaining())
+ if(_state == State.ERROR || !in.hasRemaining())
{
+ _done = true;
break;
}
case AWAITING_REVISION:
byte revision = in.get();
_connection.protocolHeaderReceived(MAJOR_VERSION, MINOR_VERSION, revision);
ProtocolHandler handler = new SASLFrameHandler(_connection);
+ _done = true;
return handler.parse(in);
}
}
if(_state == State.ERROR)
{
_connection.invalidHeaderReceived();
+
}
return this;
@@ -81,6 +86,6 @@ public class SASLProtocolHeaderHandler implements ProtocolHandler
public boolean isDone()
{
- return _state != State.ERROR && !_connection.closedForInput();
+ return _done || _connection.closedForInput();
}
}
diff --git a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 873f4e8f53..b48cdbe201 100644
--- a/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -181,7 +181,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
{
synchronized (getLock())
{
- while (!_saslComplete)
+ while (!(_saslComplete || _closedForInput))
{
try
{
@@ -711,8 +711,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
public void invalidHeaderReceived()
{
- // TODO
- _closedForInput = true;
+ setClosedForInput(true);
}
public synchronized boolean closedForInput()