diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java | 60 |
1 files changed, 32 insertions, 28 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index d8dbf97e49..0dbefd8798 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; @@ -34,15 +37,19 @@ import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; import org.apache.qpid.ssl.SSLContextFactory; -import java.io.IOException; -import java.net.InetSocketAddress; - /** * The protocol handler handles "protocol events" for all connections. The state * associated with an individual connection is accessed through the protocol session. @@ -56,9 +63,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter private final IApplicationRegistry _applicationRegistry; - private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144"; - private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144"; - private final int BUFFER_READ_LIMIT_SIZE; private final int BUFFER_WRITE_LIMIT_SIZE; @@ -72,8 +76,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter _applicationRegistry = applicationRegistry; // Read the configuration from the application registry - BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE)); - BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE)); + BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit(); + BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit(); _logger.debug("AMQPFastProtocolHandler created"); } @@ -92,17 +96,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); - - ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). - getConfiguredObject(ConnectorConfiguration.class); - if (connectorConfig.enableExecutorPool) + final ServerConfiguration config = _applicationRegistry.getConfiguration(); + + String keystorePath = config.getKeystorePath(); + String keystorePassword = config.getKeystorePassword(); + String certType = config.getCertType(); + SSLContextFactory sslContextFactory = null; + boolean isSsl = false; + if (config.getEnableSSL() && isSSLClient(config, protocolSession)) { - if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession)) + sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + isSsl = true; + } + if (config.getEnableExecutorPool()) + { + if (isSsl) { - String keystorePath = connectorConfig.keystorePath; - String keystorePassword = connectorConfig.keystorePassword; - String certType = connectorConfig.certType; - SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", new SSLFilter(sslContextFactory.buildServerContext())); } @@ -111,19 +120,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter else { protocolSession.getFilterChain().addLast("protocolFilter", pcf); - if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession)) + if (isSsl) { - String keystorePath = connectorConfig.keystorePath; - String keystorePassword = connectorConfig.keystorePassword; - String certType = connectorConfig.certType; - SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", new SSLFilter(sslContextFactory.buildServerContext())); } - } - if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false)) + if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled()) { try { @@ -271,10 +275,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } } - protected boolean isSSLClient(ConnectorConfiguration connectionConfig, + protected boolean isSSLClient(ServerConfiguration connectionConfig, IoSession protocolSession) { InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress(); - return addr.getPort() == connectionConfig.sslPort; + return addr.getPort() == connectionConfig.getSSLPort(); } } |