summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-02-04 23:36:00 +0000
committerRobert Gemmell <robbie@apache.org>2012-02-04 23:36:00 +0000
commit46507f19d7d4fa097d0282b61f2071f557beebd5 (patch)
treed640946c53737f5ac07f25b770b0a39c528115ca
parentb5fc52d5e0901feb3d4b4bcdcf254e95a0375f42 (diff)
downloadqpid-python-46507f19d7d4fa097d0282b61f2071f557beebd5.tar.gz
QPID-3813: allow configuring the default version reply to an unsupported protocol version initiation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1240644 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java11
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java65
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java (renamed from qpid/java/systests/src/main/java/org/apache/qpid/server/DisablingProtocolsTest.java)39
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes2
-rw-r--r--qpid/java/test-profiles/JavaPre010Excludes2
8 files changed, 129 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index a5dc3bb535..072f8dc132 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -187,18 +187,21 @@ public class Broker
bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
}
+ final AmqpProtocolVersion defaultSupportedProtocolReply = serverConfig.getDefaultSupportedProtocolReply();
+
if (!serverConfig.getSSLOnly())
{
for(int port : ports)
{
final Set<AmqpProtocolVersion> supported =
getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, serverConfig);
+
final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(supported);
+ new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
transport.accept(settings, protocolEngineFactory, null);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
@@ -223,7 +226,7 @@ public class Broker
final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(supported);
+ new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
transport.accept(settings, protocolEngineFactory, sslContext);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort),
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index c2425b6794..259902a938 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.signal.SignalHandlerTask;
@@ -91,6 +92,7 @@ public class ServerConfiguration extends ConfigurationPlugin
public static final String CONNECTOR_AMQP091ENABLED = "connector.amqp091enabled";
public static final String CONNECTOR_AMQP09ENABLED = "connector.amqp09enabled";
public static final String CONNECTOR_AMQP08ENABLED = "connector.amqp08enabled";
+ public static final String CONNECTOR_AMQP_SUPPORTED_REPLY = "connector.amqpDefaultSupportedProtocolReply";
{
envVarMap.put("QPID_PORT", "connector.port");
@@ -861,4 +863,13 @@ public class ServerConfiguration extends ConfigurationPlugin
return getConfig().getBoolean(CONNECTOR_AMQP08ENABLED, true);
}
+ /**
+ * Returns the configured default reply to an unsupported AMQP protocol initiation, or null if there is none
+ */
+ public AmqpProtocolVersion getDefaultSupportedProtocolReply()
+ {
+ String reply = getConfig().getString(CONNECTOR_AMQP_SUPPORTED_REPLY, null);
+
+ return reply == null ? null : AmqpProtocolVersion.valueOf(reply);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index e361bd3ece..30bbda7cff 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.log4j.Logger;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
@@ -46,26 +45,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private final AmqpProtocolVersion _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
- public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- Set<AmqpProtocolVersion> supported,
- NetworkConnection network,
- long id)
+ public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedReply,
+ final long id,
+ final NetworkConnection network)
{
- this(appRegistry, supported,id);
+ this(appRegistry, supported, defaultSupportedReply, id);
setNetworkConnection(network);
}
- public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- Set<AmqpProtocolVersion> supported,
- long id)
+ public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedReply,
+ final long id)
{
+ if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
+ {
+ throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply
+ + ") to an unsupported protocol version initiation is itself not supported!");
+ }
+
_id = id;
_appRegistry = appRegistry;
_supported = supported;
-
+ _defaultSupportedReply = defaultSupportedReply;
}
@@ -390,16 +398,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
ServerProtocolEngine newDelegate = null;
- byte[] newestSupported = null;
- AmqpProtocolVersion newestSupportedVersion = null;
+ byte[] supportedReplyBytes = null;
+ byte[] defaultSupportedReplyBytes = null;
+ AmqpProtocolVersion supportedReplyVersion = null;
+ //Check the supported versions for a header match, and if there is one save the
+ //delegate. Also save most recent supported version and associated reply header bytes
for(int i = 0; newDelegate == null && i < _creators.length; i++)
{
-
if(_supported.contains(_creators[i].getVersion()))
{
- newestSupported = _creators[i].getHeaderIdentifier();
- newestSupportedVersion = _creators[i].getVersion();
+ supportedReplyBytes = _creators[i].getHeaderIdentifier();
+ supportedReplyVersion = _creators[i].getVersion();
byte[] compareBytes = _creators[i].getHeaderIdentifier();
boolean equal = true;
for(int j = 0; equal && j<compareBytes.length; j++)
@@ -411,16 +421,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
newDelegate = _creators[i].getProtocolEngine();
}
}
+
+ //If there is a configured default reply to an unsupported version initiation,
+ //then save the associated reply header bytes when we encounter them
+ if(_defaultSupportedReply != null && _creators[i].getVersion() == _defaultSupportedReply)
+ {
+ defaultSupportedReplyBytes = _creators[i].getHeaderIdentifier();
+ }
}
- // If no delegate is found then send back the most recent support protocol version id
+ // If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
+ //if a default reply was specified use its reply header instead of the most recent supported version
+ if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion))
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Default reply to unsupported protocol version was configured, changing reply from "
+ + supportedReplyVersion + " to " + _defaultSupportedReply);
+ }
+
+ supportedReplyBytes = defaultSupportedReplyBytes;
+ supportedReplyVersion = _defaultSupportedReply;
+ }
if(_logger.isDebugEnabled())
{
- _logger.debug("Unsupported protocol version requested, replying with: " + newestSupportedVersion);
+ _logger.debug("Unsupported protocol version requested, replying with: " + supportedReplyVersion);
}
- _sender.send(ByteBuffer.wrap(newestSupported));
+ _sender.send(ByteBuffer.wrap(supportedReplyBytes));
_sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index ce5095dd2b..abe361cceb 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -35,21 +35,29 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
private final IApplicationRegistry _appRegistry;
private final Set<AmqpProtocolVersion> _supported;
+ private final AmqpProtocolVersion _defaultSupportedReply;
- public MultiVersionProtocolEngineFactory(Set<AmqpProtocolVersion> supportedVersions)
+ public MultiVersionProtocolEngineFactory(final Set<AmqpProtocolVersion> supportedVersions, final AmqpProtocolVersion defaultSupportedReply)
{
+ if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply))
+ {
+ throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply
+ + ") to an unsupported protocol version initiation is itself not supported!");
+ }
+
_appRegistry = ApplicationRegistry.getInstance();
_supported = supportedVersions;
+ _defaultSupportedReply = defaultSupportedReply;
}
public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
{
- return new MultiVersionProtocolEngine(_appRegistry, _supported, network, ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_appRegistry, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement(), network);
}
public ServerProtocolEngine newProtocolEngine()
{
- return new MultiVersionProtocolEngine(_appRegistry, _supported, ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_appRegistry, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement());
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 24a735c274..1885b2fdd4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -121,7 +121,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(versions);
+ new MultiVersionProtocolEngineFactory(versions, null);
//create a dummy to retrieve the 'current' ID number
long previousId = factory.newProtocolEngine(new TestNetworkConnection()).getConnectionId();
@@ -144,4 +144,24 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
previousId = expectedID;
}
}
+
+ /**
+ * Test to verify that when requesting a ProtocolEngineFactory to produce engines having a default reply to unsupported
+ * version initiations, there is enforcement that the default reply is itself a supported protocol version.
+ */
+ public void testUnsupportedDefaultReplyCausesIllegalArgumentException()
+ {
+ Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
+ versions.remove(AmqpProtocolVersion.v0_9);
+
+ try
+ {
+ new MultiVersionProtocolEngineFactory(versions, AmqpProtocolVersion.v0_9);
+ fail("should not have been allowed to create the factory");
+ }
+ catch(IllegalArgumentException iae)
+ {
+ //expected
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/DisablingProtocolsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java
index 5b3ac8d745..c42bb3b1fa 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/DisablingProtocolsTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java
@@ -23,13 +23,15 @@ package org.apache.qpid.server;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
* Tests to validate it is possible to disable support for particular protocol
- * versions entirely, rather than selectively excluding them on particular ports.
+ * versions entirely, rather than selectively excluding them on particular ports,
+ * and it is possible to configure the reply to an unsupported protocol initiation.
*/
-public class DisablingProtocolsTest extends QpidBrokerTestCase
+public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
{
public void setUp() throws Exception
{
@@ -90,8 +92,8 @@ public class DisablingProtocolsTest extends QpidBrokerTestCase
public void testDisabling091and010() throws Exception
{
//disable 0-91 and 0-10 support
- setConfigurationProperty("connector.amqp010enabled", "false");
- setConfigurationProperty("connector.amqp091enabled", "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false");
super.setUp();
@@ -106,9 +108,9 @@ public class DisablingProtocolsTest extends QpidBrokerTestCase
public void testDisabling09and091and010() throws Exception
{
//disable 0-9, 0-91 and 0-10 support
- setConfigurationProperty("connector.amqp09enabled", "false");
- setConfigurationProperty("connector.amqp091enabled", "false");
- setConfigurationProperty("connector.amqp010enabled", "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP09ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
super.setUp();
@@ -119,4 +121,27 @@ public class DisablingProtocolsTest extends QpidBrokerTestCase
assertEquals("Unexpected protocol version in use", ProtocolVersion.v8_0, connection.getProtocolVersion());
connection.close();
}
+
+ public void testConfiguringReplyingToUnsupported010ProtocolInitiationWith09insteadOf091() throws Exception
+ {
+ //disable 0-10 support, and set the default unsupported protocol initiation reply to 0-9
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP_SUPPORTED_REPLY, "v0_9");
+
+ super.setUp();
+
+ //Verify initially requesting a 0-10 connection now negotiates a 0-9 connection as the
+ //broker should reply with its 'default unsupported protocol initiation reply' as opposed
+ //to the previous behaviour of the highest supported protocol version.
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ AMQConnection connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_9, connection.getProtocolVersion());
+ connection.close();
+
+ //Verify requesting a 0-91 connection directly still works, as its support is still enabled
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-9-1");
+ connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion());
+ connection.close();
+ }
} \ No newline at end of file
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 61b76bbdfc..52488b3ffa 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -153,7 +153,7 @@ org.apache.qpid.test.unit.transacted.TransactionTimeoutTest#*
// Java broker only
org.apache.qpid.server.logging.management.LoggingManagementMBeanTest#*
org.apache.qpid.server.management.AMQUserManagementMBeanTest#*
-org.apache.qpid.server.DisablingProtocolsTest#*
+org.apache.qpid.server.SupportedProtocolVersionsTest#*
// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
org.apache.qpid.server.failover.FailoverMethodTest#*
diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes
index 5d46010649..ada22638be 100644
--- a/qpid/java/test-profiles/JavaPre010Excludes
+++ b/qpid/java/test-profiles/JavaPre010Excludes
@@ -24,7 +24,7 @@
// These tests requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently
org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
org.apache.qpid.server.message.MessageProtocolConversionTest#*
-org.apache.qpid.server.DisablingProtocolsTest#*
+org.apache.qpid.server.SupportedProtocolVersionsTest#*
// QPID-2478 test fails when run against broker using 0-8/9