summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java86
1 files changed, 64 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index a71d396919..ce20690f66 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -21,6 +21,11 @@
package org.apache.qpid.server.protocol;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -29,10 +34,6 @@ import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Set;
-
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
@@ -44,29 +45,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private final AmqpProtocolVersion _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
- public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- String fqdn,
- Set<AmqpProtocolVersion> supported,
- NetworkConnection network,
- long id)
+ public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedReply,
+ final long id,
+ final NetworkConnection network)
{
- this(appRegistry,fqdn,supported,id);
+ this(appRegistry, supported, defaultSupportedReply, id);
setNetworkConnection(network);
}
- public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- String fqdn,
- Set<AmqpProtocolVersion> supported,
- long id)
+ public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedReply,
+ final long id)
{
+ if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
+ {
+ throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply
+ + ") to an unsupported protocol version initiation is itself not supported!");
+ }
+
_id = id;
_appRegistry = appRegistry;
- _fqdn = fqdn;
_supported = supported;
-
+ _defaultSupportedReply = defaultSupportedReply;
}
@@ -198,6 +205,15 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
{
_network = network;
+ SocketAddress address = _network.getLocalAddress();
+ if (address instanceof InetSocketAddress)
+ {
+ _fqdn = ((InetSocketAddress) address).getHostName();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported socket address class: " + address);
+ }
_sender = sender;
}
@@ -445,14 +461,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
ServerProtocolEngine newDelegate = null;
- byte[] newestSupported = null;
+ byte[] supportedReplyBytes = null;
+ byte[] defaultSupportedReplyBytes = null;
+ AmqpProtocolVersion supportedReplyVersion = null;
+ //Check the supported versions for a header match, and if there is one save the
+ //delegate. Also save most recent supported version and associated reply header bytes
for(int i = 0; newDelegate == null && i < _creators.length; i++)
{
-
if(_supported.contains(_creators[i].getVersion()))
{
- newestSupported = _creators[i].getHeaderIdentifier();
+ supportedReplyBytes = _creators[i].getHeaderIdentifier();
+ supportedReplyVersion = _creators[i].getVersion();
byte[] compareBytes = _creators[i].getHeaderIdentifier();
boolean equal = true;
for(int j = 0; equal && j<compareBytes.length; j++)
@@ -464,12 +484,35 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
newDelegate = _creators[i].getProtocolEngine();
}
}
+
+ //If there is a configured default reply to an unsupported version initiation,
+ //then save the associated reply header bytes when we encounter them
+ if(_defaultSupportedReply != null && _creators[i].getVersion() == _defaultSupportedReply)
+ {
+ defaultSupportedReplyBytes = _creators[i].getHeaderIdentifier();
+ }
}
- // If no delegate is found then send back the most recent support protocol version id
+ // If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
- _sender.send(ByteBuffer.wrap(newestSupported));
+ //if a default reply was specified use its reply header instead of the most recent supported version
+ if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion))
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Default reply to unsupported protocol version was configured, changing reply from "
+ + supportedReplyVersion + " to " + _defaultSupportedReply);
+ }
+
+ supportedReplyBytes = defaultSupportedReplyBytes;
+ supportedReplyVersion = _defaultSupportedReply;
+ }
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Unsupported protocol version requested, replying with: " + supportedReplyVersion);
+ }
+ _sender.send(ByteBuffer.wrap(supportedReplyBytes));
_sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
@@ -482,7 +525,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_delegate = newDelegate;
_header.flip();
- _delegate.setNetworkConnection(_network, _sender);
_delegate.received(_header);
if(msg.hasRemaining())
{