summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-27 22:59:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-27 22:59:46 +0000
commit60089dfd1a12303822b3f82816905f0a29a6a746 (patch)
treef37b2dcac2f1c7fbe03377827856903ce99d92df
parent5adf702b6e2c66ad0f7098ed53fcbc7dd568b2d2 (diff)
downloadqpid-python-60089dfd1a12303822b3f82816905f0a29a6a746.tar.gz
QPID-6342 : Fail fast when commands sent in wrong order
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1655186 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java45
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java64
3 files changed, 79 insertions, 36 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 4212505d75..3783cd70ac 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -96,6 +96,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
ServerMethodProcessor<ServerChannelMethodProcessor>
{
+ enum ConnectionState
+ {
+ INIT,
+ AWAIT_START_OK,
+ AWAIT_SECURE_OK,
+ AWAIT_TUNE_OK,
+ AWAIT_OPEN,
+ OPEN
+ }
+
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
@@ -123,6 +133,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ private ConnectionState _state = ConnectionState.INIT;
+
/**
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
* Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
@@ -469,16 +481,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
serverProperties,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(asByteBuffer(responseBody.generateFrame(0)));
- _sender.flush();
+ writeFrame(responseBody.generateFrame(0));
+ _state = ConnectionState.AWAIT_START_OK;
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
- _sender.flush();
+ writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
}
@@ -1467,6 +1478,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
_logger.debug("RECV[" + channelId + "] ChannelOpen");
}
+ assertState(ConnectionState.OPEN);
// Protect the broker against out of order frame request.
if (_virtualHost == null)
@@ -1503,6 +1515,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
+ void assertState(final ConnectionState requiredState)
+ {
+ if(_state != requiredState)
+ {
+ closeConnection(AMQConstant.COMMAND_INVALID, "Command Invalid", 0);
+
+ }
+ }
+
@Override
public void receiveConnectionOpen(AMQShortString virtualHostName,
AMQShortString capabilities,
@@ -1555,6 +1576,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
writeFrame(responseBody.generateFrame(0));
+ _state = ConnectionState.OPEN;
}
catch (AccessControlException e)
{
@@ -1625,6 +1647,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_logger.debug("RECV ConnectionSecureOk[ response: ******** ] ");
}
+ assertState(ConnectionState.AWAIT_SECURE_OK);
+
Broker<?> broker = getBroker();
SubjectCreator subjectCreator = getSubjectCreator();
@@ -1665,6 +1689,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
frameMax,
broker.getConnection_heartBeatDelay());
writeFrame(tuneBody.generateFrame(0));
+ _state = ConnectionState.AWAIT_TUNE_OK;
setAuthorizedSubject(authResult.getSubject());
disposeSaslServer();
break;
@@ -1713,6 +1738,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
+ " ]");
}
+ assertState(ConnectionState.AWAIT_START_OK);
+
Broker<?> broker = getBroker();
_logger.info("SASL Mechanism selected: " + mechanism);
@@ -1774,11 +1801,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
frameMax,
broker.getConnection_heartBeatDelay());
writeFrame(tuneBody.generateFrame(0));
+ _state = ConnectionState.AWAIT_TUNE_OK;
break;
case CONTINUE:
ConnectionSecureBody
secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
writeFrame(secureBody.generateFrame(0));
+
+ _state = ConnectionState.AWAIT_SECURE_OK;
}
}
}
@@ -1797,6 +1827,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
}
+ assertState(ConnectionState.AWAIT_TUNE_OK);
+
initHeartbeats(heartbeat);
int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
@@ -1828,7 +1860,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
? 0xFFFFL
: channelMax);
+
}
+ _state = ConnectionState.AWAIT_OPEN;
+
}
public int getBinaryDataLimit()
@@ -1928,6 +1963,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
{
+ assertState(ConnectionState.OPEN);
+
ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId);
if(channelMethodProcessor == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 7407890b58..6c6b746cf2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -275,6 +275,12 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
}
+ void assertState(final ConnectionState requiredState)
+ {
+ // no-op
+ }
+
+
private static final AtomicInteger portNumber = new AtomicInteger(0);
private static class TestNetworkConnection implements NetworkConnection
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
index 32a45da60c..deed32346f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
@@ -42,7 +42,6 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
throws AMQFrameDecodingException, IOException
{
ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor();
- ServerChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
final int classAndMethod = in.readInt();
int classId = classAndMethod >> 16;
int methodId = classAndMethod & 0xFFFF;
@@ -115,116 +114,117 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
ChannelOpenBody.process(channelId, in, methodProcessor);
break;
case 0x00140014:
- ChannelFlowBody.process(in, channelMethodProcessor);
+ ChannelFlowBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00140015:
- ChannelFlowOkBody.process(in, channelMethodProcessor);
+ ChannelFlowOkBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00140028:
- ChannelCloseBody.process(in, channelMethodProcessor);
+ ChannelCloseBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00140029:
- channelMethodProcessor.receiveChannelCloseOk();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveChannelCloseOk();
break;
// ACCESS_CLASS:
case 0x001e000a:
- AccessRequestBody.process(in, channelMethodProcessor);
+ AccessRequestBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// EXCHANGE_CLASS:
case 0x0028000a:
- ExchangeDeclareBody.process(in, channelMethodProcessor);
+ ExchangeDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00280014:
- ExchangeDeleteBody.process(in, channelMethodProcessor);
+ ExchangeDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00280016:
- ExchangeBoundBody.process(in, channelMethodProcessor);
+ ExchangeBoundBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// QUEUE_CLASS:
case 0x0032000a:
- QueueDeclareBody.process(in, channelMethodProcessor);
+ QueueDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00320014:
- QueueBindBody.process(in, channelMethodProcessor);
+ QueueBindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x0032001e:
- QueuePurgeBody.process(in, channelMethodProcessor);
+ QueuePurgeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00320028:
- QueueDeleteBody.process(in, channelMethodProcessor);
+ QueueDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x00320032:
- QueueUnbindBody.process(in, channelMethodProcessor);
+ QueueUnbindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// BASIC_CLASS:
case 0x003c000a:
- BasicQosBody.process(in, channelMethodProcessor);
+ BasicQosBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0014:
- BasicConsumeBody.process(in, channelMethodProcessor);
+ BasicConsumeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c001e:
- BasicCancelBody.process(in, channelMethodProcessor);
+ BasicCancelBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0028:
- BasicPublishBody.process(in, channelMethodProcessor);
+ BasicPublishBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0046:
- BasicGetBody.process(in, channelMethodProcessor);
+ BasicGetBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0050:
- BasicAckBody.process(in, channelMethodProcessor);
+ BasicAckBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c005a:
- BasicRejectBody.process(in, channelMethodProcessor);
+ BasicRejectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0064:
- BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor);
+ BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(),
+ methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0066:
- BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c006e:
- BasicRecoverSyncBody.process(in, channelMethodProcessor);
+ BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
case 0x003c0078:
- BasicNackBody.process(in, channelMethodProcessor);
+ BasicNackBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// CONFIRM CLASS:
case 0x0055000a:
- ConfirmSelectBody.process(in, channelMethodProcessor);
+ ConfirmSelectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
break;
// TX_CLASS:
case 0x005a000a:
- if(!channelMethodProcessor.ignoreAllButCloseOk())
+ if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
{
- channelMethodProcessor.receiveTxSelect();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveTxSelect();
}
break;
case 0x005a0014:
- if(!channelMethodProcessor.ignoreAllButCloseOk())
+ if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
{
- channelMethodProcessor.receiveTxCommit();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveTxCommit();
}
break;
case 0x005a001e:
- if(!channelMethodProcessor.ignoreAllButCloseOk())
+ if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
{
- channelMethodProcessor.receiveTxRollback();
+ methodProcessor.getChannelMethodProcessor(channelId).receiveTxRollback();
}
break;