summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java60
1 files changed, 32 insertions, 28 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index d8dbf97e49..0dbefd8798 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
@@ -34,15 +37,19 @@ import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.transport.ConnectorConfiguration;
import org.apache.qpid.ssl.SSLContextFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
/**
* The protocol handler handles "protocol events" for all connections. The state
* associated with an individual connection is accessed through the protocol session.
@@ -56,9 +63,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
private final IApplicationRegistry _applicationRegistry;
- private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144";
- private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144";
-
private final int BUFFER_READ_LIMIT_SIZE;
private final int BUFFER_WRITE_LIMIT_SIZE;
@@ -72,8 +76,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
_applicationRegistry = applicationRegistry;
// Read the configuration from the application registry
- BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE));
- BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE));
+ BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit();
+ BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit();
_logger.debug("AMQPFastProtocolHandler created");
}
@@ -92,17 +96,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
_logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
-
- ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
- getConfiguredObject(ConnectorConfiguration.class);
- if (connectorConfig.enableExecutorPool)
+ final ServerConfiguration config = _applicationRegistry.getConfiguration();
+
+ String keystorePath = config.getKeystorePath();
+ String keystorePassword = config.getKeystorePassword();
+ String certType = config.getCertType();
+ SSLContextFactory sslContextFactory = null;
+ boolean isSsl = false;
+ if (config.getEnableSSL() && isSSLClient(config, protocolSession))
{
- if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession))
+ sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ isSsl = true;
+ }
+ if (config.getEnableExecutorPool())
+ {
+ if (isSsl)
{
- String keystorePath = connectorConfig.keystorePath;
- String keystorePassword = connectorConfig.keystorePassword;
- String certType = connectorConfig.certType;
- SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
new SSLFilter(sslContextFactory.buildServerContext()));
}
@@ -111,19 +120,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
else
{
protocolSession.getFilterChain().addLast("protocolFilter", pcf);
- if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession))
+ if (isSsl)
{
- String keystorePath = connectorConfig.keystorePath;
- String keystorePassword = connectorConfig.keystorePassword;
- String certType = connectorConfig.certType;
- SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
new SSLFilter(sslContextFactory.buildServerContext()));
}
-
}
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false))
+ if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled())
{
try
{
@@ -271,10 +275,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
}
- protected boolean isSSLClient(ConnectorConfiguration connectionConfig,
+ protected boolean isSSLClient(ServerConfiguration connectionConfig,
IoSession protocolSession)
{
InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress();
- return addr.getPort() == connectionConfig.sslPort;
+ return addr.getPort() == connectionConfig.getSSLPort();
}
}