summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java284
1 files changed, 284 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
new file mode 100644
index 0000000000..0dbefd8798
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -0,0 +1,284 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+/**
+ * The protocol handler handles "protocol events" for all connections. The state
+ * associated with an individual connection is accessed through the protocol session.
+ *
+ * We delegate all frame (message) processing to the AMQProtocolSession which wraps
+ * the state for the connection.
+ */
+public class AMQPFastProtocolHandler extends IoHandlerAdapter
+{
+ private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
+
+ private final IApplicationRegistry _applicationRegistry;
+
+ private final int BUFFER_READ_LIMIT_SIZE;
+ private final int BUFFER_WRITE_LIMIT_SIZE;
+
+ public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
+ {
+ this(ApplicationRegistry.getInstance(applicationRegistryInstance));
+ }
+
+ public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+
+ // Read the configuration from the application registry
+ BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit();
+ BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit();
+
+ _logger.debug("AMQPFastProtocolHandler created");
+ }
+
+ protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
+ {
+ this(handler._applicationRegistry);
+ }
+
+ public void sessionCreated(IoSession protocolSession) throws Exception
+ {
+ SessionUtil.initialize(protocolSession);
+ final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
+
+ createSession(protocolSession, _applicationRegistry, codecFactory);
+ _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
+
+ final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
+ final ServerConfiguration config = _applicationRegistry.getConfiguration();
+
+ String keystorePath = config.getKeystorePath();
+ String keystorePassword = config.getKeystorePassword();
+ String certType = config.getCertType();
+ SSLContextFactory sslContextFactory = null;
+ boolean isSsl = false;
+ if (config.getEnableSSL() && isSSLClient(config, protocolSession))
+ {
+ sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ isSsl = true;
+ }
+ if (config.getEnableExecutorPool())
+ {
+ if (isSsl)
+ {
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(sslContextFactory.buildServerContext()));
+ }
+ protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
+ }
+ else
+ {
+ protocolSession.getFilterChain().addLast("protocolFilter", pcf);
+ if (isSsl)
+ {
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(sslContextFactory.buildServerContext()));
+ }
+ }
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled())
+ {
+ try
+ {
+// //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
+
+
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
+ writefilter.attach(chain);
+
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ _logger.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
+ }
+
+ /** Separated into its own, protected, method to allow easier reuse */
+ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
+ {
+ new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
+ }
+
+ public void sessionOpened(IoSession protocolSession) throws Exception
+ {
+ _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
+ }
+
+ public void sessionClosed(IoSession protocolSession) throws Exception
+ {
+ _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
+ final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
+ //fixme -- this can be null
+ if (amqProtocolSession != null)
+ {
+ try
+ {
+ amqProtocolSession.closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Caught AMQException whilst closingSession:" + e);
+ }
+ }
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ //write heartbeat frame:
+ session.write(HeartbeatBody.FRAME);
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ //failover:
+ throw new IOException("Timed out while waiting for heartbeat from peer.");
+ }
+
+ }
+
+ public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
+ {
+ AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
+ if (throwable instanceof AMQProtocolHeaderException)
+ {
+
+ protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+
+ protocolSession.close();
+
+ _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
+ }
+ else if (throwable instanceof IOException)
+ {
+ _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
+ }
+ else
+ {
+ _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
+
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+
+ protocolSession.write(closeBody.generateFrame(0));
+
+ protocolSession.close();
+ }
+ }
+
+ /**
+ * Invoked when a message is received on a particular protocol session. Note that a
+ * protocol session is directly tied to a particular physical connection.
+ *
+ * @param protocolSession the protocol session that received the message
+ * @param message the message itself (i.e. a decoded frame)
+ *
+ * @throws Exception if the message cannot be processed
+ */
+ public void messageReceived(IoSession protocolSession, Object message) throws Exception
+ {
+ final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
+
+ if (message instanceof AMQDataBlock)
+ {
+ amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
+
+ }
+ else if (message instanceof ByteBuffer)
+ {
+ throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message);
+ }
+ else
+ {
+ throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
+ }
+ }
+
+ /**
+ * Called after a message has been sent out on a particular protocol session
+ *
+ * @param protocolSession the protocol session (i.e. connection) on which this
+ * message was sent
+ * @param object the message (frame) that was encoded and sent
+ *
+ * @throws Exception if we want to indicate an error
+ */
+ public void messageSent(IoSession protocolSession, Object object) throws Exception
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message sent: " + object);
+ }
+ }
+
+ protected boolean isSSLClient(ServerConfiguration connectionConfig,
+ IoSession protocolSession)
+ {
+ InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress();
+ return addr.getPort() == connectionConfig.getSSLPort();
+ }
+}