summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-27 22:59:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-27 22:59:46 +0000
commit60089dfd1a12303822b3f82816905f0a29a6a746 (patch)
treef37b2dcac2f1c7fbe03377827856903ce99d92df /qpid/java/broker-plugins
parent5adf702b6e2c66ad0f7098ed53fcbc7dd568b2d2 (diff)
downloadqpid-python-60089dfd1a12303822b3f82816905f0a29a6a746.tar.gz
QPID-6342 : Fail fast when commands sent in wrong order
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1655186 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java45
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java6
2 files changed, 47 insertions, 4 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 4212505d75..3783cd70ac 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -96,6 +96,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
ServerMethodProcessor<ServerChannelMethodProcessor>
{
+ enum ConnectionState
+ {
+ INIT,
+ AWAIT_START_OK,
+ AWAIT_SECURE_OK,
+ AWAIT_TUNE_OK,
+ AWAIT_OPEN,
+ OPEN
+ }
+
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
@@ -123,6 +133,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ private ConnectionState _state = ConnectionState.INIT;
+
/**
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
* Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
@@ -469,16 +481,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
serverProperties,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(asByteBuffer(responseBody.generateFrame(0)));
- _sender.flush();
+ writeFrame(responseBody.generateFrame(0));
+ _state = ConnectionState.AWAIT_START_OK;
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
- _sender.flush();
+ writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
}
@@ -1467,6 +1478,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
_logger.debug("RECV[" + channelId + "] ChannelOpen");
}
+ assertState(ConnectionState.OPEN);
// Protect the broker against out of order frame request.
if (_virtualHost == null)
@@ -1503,6 +1515,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
+ void assertState(final ConnectionState requiredState)
+ {
+ if(_state != requiredState)
+ {
+ closeConnection(AMQConstant.COMMAND_INVALID, "Command Invalid", 0);
+
+ }
+ }
+
@Override
public void receiveConnectionOpen(AMQShortString virtualHostName,
AMQShortString capabilities,
@@ -1555,6 +1576,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
writeFrame(responseBody.generateFrame(0));
+ _state = ConnectionState.OPEN;
}
catch (AccessControlException e)
{
@@ -1625,6 +1647,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_logger.debug("RECV ConnectionSecureOk[ response: ******** ] ");
}
+ assertState(ConnectionState.AWAIT_SECURE_OK);
+
Broker<?> broker = getBroker();
SubjectCreator subjectCreator = getSubjectCreator();
@@ -1665,6 +1689,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
frameMax,
broker.getConnection_heartBeatDelay());
writeFrame(tuneBody.generateFrame(0));
+ _state = ConnectionState.AWAIT_TUNE_OK;
setAuthorizedSubject(authResult.getSubject());
disposeSaslServer();
break;
@@ -1713,6 +1738,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
+ " ]");
}
+ assertState(ConnectionState.AWAIT_START_OK);
+
Broker<?> broker = getBroker();
_logger.info("SASL Mechanism selected: " + mechanism);
@@ -1774,11 +1801,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
frameMax,
broker.getConnection_heartBeatDelay());
writeFrame(tuneBody.generateFrame(0));
+ _state = ConnectionState.AWAIT_TUNE_OK;
break;
case CONTINUE:
ConnectionSecureBody
secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
writeFrame(secureBody.generateFrame(0));
+
+ _state = ConnectionState.AWAIT_SECURE_OK;
}
}
}
@@ -1797,6 +1827,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
}
+ assertState(ConnectionState.AWAIT_TUNE_OK);
+
initHeartbeats(heartbeat);
int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
@@ -1828,7 +1860,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
? 0xFFFFL
: channelMax);
+
}
+ _state = ConnectionState.AWAIT_OPEN;
+
}
public int getBinaryDataLimit()
@@ -1928,6 +1963,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
{
+ assertState(ConnectionState.OPEN);
+
ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId);
if(channelMethodProcessor == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 7407890b58..6c6b746cf2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -275,6 +275,12 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
}
+ void assertState(final ConnectionState requiredState)
+ {
+ // no-op
+ }
+
+
private static final AtomicInteger portNumber = new AtomicInteger(0);
private static class TestNetworkConnection implements NetworkConnection