diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java | 86 |
1 files changed, 64 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index a71d396919..ce20690f66 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -21,6 +21,11 @@ package org.apache.qpid.server.protocol; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Set; + import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -29,10 +34,6 @@ import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.Set; - public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); @@ -44,29 +45,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private IApplicationRegistry _appRegistry; private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private final AmqpProtocolVersion _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); - public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, - String fqdn, - Set<AmqpProtocolVersion> supported, - NetworkConnection network, - long id) + public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedReply, + final long id, + final NetworkConnection network) { - this(appRegistry,fqdn,supported,id); + this(appRegistry, supported, defaultSupportedReply, id); setNetworkConnection(network); } - public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, - String fqdn, - Set<AmqpProtocolVersion> supported, - long id) + public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry, + final Set<AmqpProtocolVersion> supported, + final AmqpProtocolVersion defaultSupportedReply, + final long id) { + if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply)) + { + throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply + + ") to an unsupported protocol version initiation is itself not supported!"); + } + _id = id; _appRegistry = appRegistry; - _fqdn = fqdn; _supported = supported; - + _defaultSupportedReply = defaultSupportedReply; } @@ -198,6 +205,15 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) { _network = network; + SocketAddress address = _network.getLocalAddress(); + if (address instanceof InetSocketAddress) + { + _fqdn = ((InetSocketAddress) address).getHostName(); + } + else + { + throw new IllegalArgumentException("Unsupported socket address class: " + address); + } _sender = sender; } @@ -445,14 +461,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine ServerProtocolEngine newDelegate = null; - byte[] newestSupported = null; + byte[] supportedReplyBytes = null; + byte[] defaultSupportedReplyBytes = null; + AmqpProtocolVersion supportedReplyVersion = null; + //Check the supported versions for a header match, and if there is one save the + //delegate. Also save most recent supported version and associated reply header bytes for(int i = 0; newDelegate == null && i < _creators.length; i++) { - if(_supported.contains(_creators[i].getVersion())) { - newestSupported = _creators[i].getHeaderIdentifier(); + supportedReplyBytes = _creators[i].getHeaderIdentifier(); + supportedReplyVersion = _creators[i].getVersion(); byte[] compareBytes = _creators[i].getHeaderIdentifier(); boolean equal = true; for(int j = 0; equal && j<compareBytes.length; j++) @@ -464,12 +484,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine newDelegate = _creators[i].getProtocolEngine(); } } + + //If there is a configured default reply to an unsupported version initiation, + //then save the associated reply header bytes when we encounter them + if(_defaultSupportedReply != null && _creators[i].getVersion() == _defaultSupportedReply) + { + defaultSupportedReplyBytes = _creators[i].getHeaderIdentifier(); + } } - // If no delegate is found then send back the most recent support protocol version id + // If no delegate is found then send back a supported protocol version id if(newDelegate == null) { - _sender.send(ByteBuffer.wrap(newestSupported)); + //if a default reply was specified use its reply header instead of the most recent supported version + if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion)) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Default reply to unsupported protocol version was configured, changing reply from " + + supportedReplyVersion + " to " + _defaultSupportedReply); + } + + supportedReplyBytes = defaultSupportedReplyBytes; + supportedReplyVersion = _defaultSupportedReply; + } + if(_logger.isDebugEnabled()) + { + _logger.debug("Unsupported protocol version requested, replying with: " + supportedReplyVersion); + } + _sender.send(ByteBuffer.wrap(supportedReplyBytes)); _sender.flush(); _delegate = new ClosedDelegateProtocolEngine(); @@ -482,7 +525,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate = newDelegate; _header.flip(); - _delegate.setNetworkConnection(_network, _sender); _delegate.received(_header); if(msg.hasRemaining()) { |