diff options
author | Robert Gemmell <robbie@apache.org> | 2012-02-04 23:36:00 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-02-04 23:36:00 +0000 |
commit | 46507f19d7d4fa097d0282b61f2071f557beebd5 (patch) | |
tree | d640946c53737f5ac07f25b770b0a39c528115ca | |
parent | b5fc52d5e0901feb3d4b4bcdcf254e95a0375f42 (diff) | |
download | qpid-python-46507f19d7d4fa097d0282b61f2071f557beebd5.tar.gz |
QPID-3813: allow configuring the default version reply to an unsupported protocol version initiation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1240644 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java | 7 | ||||
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java | 11 | ||||
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java | 65 | ||||
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java | 14 | ||||
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java | 22 | ||||
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java (renamed from qpid/java/systests/src/main/java/org/apache/qpid/server/DisablingProtocolsTest.java) | 39 | ||||
-rwxr-xr-x | qpid/java/test-profiles/CPPExcludes | 2 | ||||
-rw-r--r-- | qpid/java/test-profiles/JavaPre010Excludes | 2 |
8 files changed, 129 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index a5dc3bb535..072f8dc132 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -187,18 +187,21 @@ public class Broker bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); } + final AmqpProtocolVersion defaultSupportedProtocolReply = serverConfig.getDefaultSupportedProtocolReply(); + if (!serverConfig.getSSLOnly()) { for(int port : ports) { final Set<AmqpProtocolVersion> supported = getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, serverConfig); + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(supported); + new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply); transport.accept(settings, protocolEngineFactory, null); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), @@ -223,7 +226,7 @@ public class Broker final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(supported); + new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply); transport.accept(settings, protocolEngineFactory, sslContext); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort), diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index c2425b6794..259902a938 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -31,6 +31,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.signal.SignalHandlerTask; @@ -91,6 +92,7 @@ public class ServerConfiguration extends ConfigurationPlugin public static final String CONNECTOR_AMQP091ENABLED = "connector.amqp091enabled"; public static final String CONNECTOR_AMQP09ENABLED = "connector.amqp09enabled"; public static final String CONNECTOR_AMQP08ENABLED = "connector.amqp08enabled"; + public static final String CONNECTOR_AMQP_SUPPORTED_REPLY = "connector.amqpDefaultSupportedProtocolReply"; { envVarMap.put("QPID_PORT", "connector.port"); @@ -861,4 +863,13 @@ public class ServerConfiguration extends ConfigurationPlugin return getConfig().getBoolean(CONNECTOR_AMQP08ENABLED, true); } + /** + * Returns the configured default reply to an unsupported AMQP protocol initiation, or null if there is none + */ + public AmqpProtocolVersion getDefaultSupportedProtocolReply() + { + String reply = getConfig().getString(CONNECTOR_AMQP_SUPPORTED_REPLY, null); + + return reply == null ? null : AmqpProtocolVersion.valueOf(reply); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index e361bd3ece..30bbda7cff 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -27,7 +27,6 @@ import java.nio.ByteBuffer; import java.util.Set; import org.apache.log4j.Logger; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; @@ -46,26 +45,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private IApplicationRegistry _appRegistry; private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private final AmqpProtocolVersion _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); - public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, - Set<AmqpProtocolVersion> supported, - NetworkConnection network, - long id) + public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedReply, + final long id, + final NetworkConnection network) { - this(appRegistry, supported,id); + this(appRegistry, supported, defaultSupportedReply, id); setNetworkConnection(network); } - public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, - Set<AmqpProtocolVersion> supported, - long id) + public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedReply, + final long id) { + if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply)) + { + throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply + + ") to an unsupported protocol version initiation is itself not supported!"); + } + _id = id; _appRegistry = appRegistry; _supported = supported; - + _defaultSupportedReply = defaultSupportedReply; } @@ -390,16 +398,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine ServerProtocolEngine newDelegate = null; - byte[] newestSupported = null; - AmqpProtocolVersion newestSupportedVersion = null; + byte[] supportedReplyBytes = null; + byte[] defaultSupportedReplyBytes = null; + AmqpProtocolVersion supportedReplyVersion = null; + //Check the supported versions for a header match, and if there is one save the + //delegate. Also save most recent supported version and associated reply header bytes for(int i = 0; newDelegate == null && i < _creators.length; i++) { - if(_supported.contains(_creators[i].getVersion())) { - newestSupported = _creators[i].getHeaderIdentifier(); - newestSupportedVersion = _creators[i].getVersion(); + supportedReplyBytes = _creators[i].getHeaderIdentifier(); + supportedReplyVersion = _creators[i].getVersion(); byte[] compareBytes = _creators[i].getHeaderIdentifier(); boolean equal = true; for(int j = 0; equal && j<compareBytes.length; j++) @@ -411,16 +421,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine newDelegate = _creators[i].getProtocolEngine(); } } + + //If there is a configured default reply to an unsupported version initiation, + //then save the associated reply header bytes when we encounter them + if(_defaultSupportedReply != null && _creators[i].getVersion() == _defaultSupportedReply) + { + defaultSupportedReplyBytes = _creators[i].getHeaderIdentifier(); + } } - // If no delegate is found then send back the most recent support protocol version id + // If no delegate is found then send back a supported protocol version id if(newDelegate == null) { + //if a default reply was specified use its reply header instead of the most recent supported version + if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion)) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Default reply to unsupported protocol version was configured, changing reply from " + + supportedReplyVersion + " to " + _defaultSupportedReply); + } + + supportedReplyBytes = defaultSupportedReplyBytes; + supportedReplyVersion = _defaultSupportedReply; + } if(_logger.isDebugEnabled()) { - _logger.debug("Unsupported protocol version requested, replying with: " + newestSupportedVersion); + _logger.debug("Unsupported protocol version requested, replying with: " + supportedReplyVersion); } - _sender.send(ByteBuffer.wrap(newestSupported)); + _sender.send(ByteBuffer.wrap(supportedReplyBytes)); _sender.flush(); _delegate = new ClosedDelegateProtocolEngine(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index ce5095dd2b..abe361cceb 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -35,21 +35,29 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory private final IApplicationRegistry _appRegistry; private final Set<AmqpProtocolVersion> _supported; + private final AmqpProtocolVersion _defaultSupportedReply; - public MultiVersionProtocolEngineFactory(Set<AmqpProtocolVersion> supportedVersions) + public MultiVersionProtocolEngineFactory(final Set<AmqpProtocolVersion> supportedVersions, final AmqpProtocolVersion defaultSupportedReply) { + if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply)) + { + throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply + + ") to an unsupported protocol version initiation is itself not supported!"); + } + _appRegistry = ApplicationRegistry.getInstance(); _supported = supportedVersions; + _defaultSupportedReply = defaultSupportedReply; } public ServerProtocolEngine newProtocolEngine(NetworkConnection network) { - return new MultiVersionProtocolEngine(_appRegistry, _supported, network, ID_GENERATOR.getAndIncrement()); + return new MultiVersionProtocolEngine(_appRegistry, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement(), network); } public ServerProtocolEngine newProtocolEngine() { - return new MultiVersionProtocolEngine(_appRegistry, _supported, ID_GENERATOR.getAndIncrement()); + return new MultiVersionProtocolEngine(_appRegistry, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement()); } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 24a735c274..1885b2fdd4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -121,7 +121,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class); MultiVersionProtocolEngineFactory factory = - new MultiVersionProtocolEngineFactory(versions); + new MultiVersionProtocolEngineFactory(versions, null); //create a dummy to retrieve the 'current' ID number long previousId = factory.newProtocolEngine(new TestNetworkConnection()).getConnectionId(); @@ -144,4 +144,24 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase previousId = expectedID; } } + + /** + * Test to verify that when requesting a ProtocolEngineFactory to produce engines having a default reply to unsupported + * version initiations, there is enforcement that the default reply is itself a supported protocol version. + */ + public void testUnsupportedDefaultReplyCausesIllegalArgumentException() + { + Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class); + versions.remove(AmqpProtocolVersion.v0_9); + + try + { + new MultiVersionProtocolEngineFactory(versions, AmqpProtocolVersion.v0_9); + fail("should not have been allowed to create the factory"); + } + catch(IllegalArgumentException iae) + { + //expected + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/DisablingProtocolsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java index 5b3ac8d745..c42bb3b1fa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/DisablingProtocolsTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java @@ -23,13 +23,15 @@ package org.apache.qpid.server; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.test.utils.QpidBrokerTestCase; /** * Tests to validate it is possible to disable support for particular protocol - * versions entirely, rather than selectively excluding them on particular ports. + * versions entirely, rather than selectively excluding them on particular ports, + * and it is possible to configure the reply to an unsupported protocol initiation. */ -public class DisablingProtocolsTest extends QpidBrokerTestCase +public class SupportedProtocolVersionsTest extends QpidBrokerTestCase { public void setUp() throws Exception { @@ -90,8 +92,8 @@ public class DisablingProtocolsTest extends QpidBrokerTestCase public void testDisabling091and010() throws Exception { //disable 0-91 and 0-10 support - setConfigurationProperty("connector.amqp010enabled", "false"); - setConfigurationProperty("connector.amqp091enabled", "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false"); super.setUp(); @@ -106,9 +108,9 @@ public class DisablingProtocolsTest extends QpidBrokerTestCase public void testDisabling09and091and010() throws Exception { //disable 0-9, 0-91 and 0-10 support - setConfigurationProperty("connector.amqp09enabled", "false"); - setConfigurationProperty("connector.amqp091enabled", "false"); - setConfigurationProperty("connector.amqp010enabled", "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP09ENABLED, "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); super.setUp(); @@ -119,4 +121,27 @@ public class DisablingProtocolsTest extends QpidBrokerTestCase assertEquals("Unexpected protocol version in use", ProtocolVersion.v8_0, connection.getProtocolVersion()); connection.close(); } + + public void testConfiguringReplyingToUnsupported010ProtocolInitiationWith09insteadOf091() throws Exception + { + //disable 0-10 support, and set the default unsupported protocol initiation reply to 0-9 + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP_SUPPORTED_REPLY, "v0_9"); + + super.setUp(); + + //Verify initially requesting a 0-10 connection now negotiates a 0-9 connection as the + //broker should reply with its 'default unsupported protocol initiation reply' as opposed + //to the previous behaviour of the highest supported protocol version. + setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); + AMQConnection connection = (AMQConnection) getConnection(); + assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_9, connection.getProtocolVersion()); + connection.close(); + + //Verify requesting a 0-91 connection directly still works, as its support is still enabled + setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-9-1"); + connection = (AMQConnection) getConnection(); + assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion()); + connection.close(); + } }
\ No newline at end of file diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 61b76bbdfc..52488b3ffa 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -153,7 +153,7 @@ org.apache.qpid.test.unit.transacted.TransactionTimeoutTest#* // Java broker only org.apache.qpid.server.logging.management.LoggingManagementMBeanTest#* org.apache.qpid.server.management.AMQUserManagementMBeanTest#* -org.apache.qpid.server.DisablingProtocolsTest#* +org.apache.qpid.server.SupportedProtocolVersionsTest#* // QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs. org.apache.qpid.server.failover.FailoverMethodTest#* diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index 5d46010649..ada22638be 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -24,7 +24,7 @@ // These tests requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend org.apache.qpid.server.message.MessageProtocolConversionTest#* -org.apache.qpid.server.DisablingProtocolsTest#* +org.apache.qpid.server.SupportedProtocolVersionsTest#* // QPID-2478 test fails when run against broker using 0-8/9 |