diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java | 284 |
1 files changed, 0 insertions, 284 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java deleted file mode 100644 index 0dbefd8798..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.codec.QpidProtocolCodecFilter; -import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.util.SessionUtil; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQProtocolHeaderException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.ssl.SSLContextFactory; - -/** - * The protocol handler handles "protocol events" for all connections. The state - * associated with an individual connection is accessed through the protocol session. - * - * We delegate all frame (message) processing to the AMQProtocolSession which wraps - * the state for the connection. - */ -public class AMQPFastProtocolHandler extends IoHandlerAdapter -{ - private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); - - private final IApplicationRegistry _applicationRegistry; - - private final int BUFFER_READ_LIMIT_SIZE; - private final int BUFFER_WRITE_LIMIT_SIZE; - - public AMQPFastProtocolHandler(Integer applicationRegistryInstance) - { - this(ApplicationRegistry.getInstance(applicationRegistryInstance)); - } - - public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry) - { - _applicationRegistry = applicationRegistry; - - // Read the configuration from the application registry - BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit(); - BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit(); - - _logger.debug("AMQPFastProtocolHandler created"); - } - - protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler) - { - this(handler._applicationRegistry); - } - - public void sessionCreated(IoSession protocolSession) throws Exception - { - SessionUtil.initialize(protocolSession); - final AMQCodecFactory codecFactory = new AMQCodecFactory(true); - - createSession(protocolSession, _applicationRegistry, codecFactory); - _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); - - final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); - final ServerConfiguration config = _applicationRegistry.getConfiguration(); - - String keystorePath = config.getKeystorePath(); - String keystorePassword = config.getKeystorePassword(); - String certType = config.getCertType(); - SSLContextFactory sslContextFactory = null; - boolean isSsl = false; - if (config.getEnableSSL() && isSSLClient(config, protocolSession)) - { - sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); - isSsl = true; - } - if (config.getEnableExecutorPool()) - { - if (isSsl) - { - protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", - new SSLFilter(sslContextFactory.buildServerContext())); - } - protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - protocolSession.getFilterChain().addLast("protocolFilter", pcf); - if (isSsl) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(sslContextFactory.buildServerContext())); - } - } - - if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled()) - { - try - { -// //Add IO Protection Filters - IoFilterChain chain = protocolSession.getFilterChain(); - - - protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE); - writefilter.attach(chain); - - protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - _logger.info("Using IO Read/Write Filter Protection"); - } - catch (Exception e) - { - _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); - } - } - } - - /** Separated into its own, protected, method to allow easier reuse */ - protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException - { - new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec); - } - - public void sessionOpened(IoSession protocolSession) throws Exception - { - _logger.info("Session opened for:" + protocolSession.getRemoteAddress()); - } - - public void sessionClosed(IoSession protocolSession) throws Exception - { - _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress()); - final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); - //fixme -- this can be null - if (amqProtocolSession != null) - { - try - { - amqProtocolSession.closeSession(); - } - catch (AMQException e) - { - _logger.error("Caught AMQException whilst closingSession:" + e); - } - } - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress()); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - //write heartbeat frame: - session.write(HeartbeatBody.FRAME); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - //failover: - throw new IOException("Timed out while waiting for heartbeat from peer."); - } - - } - - public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception - { - AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); - if (throwable instanceof AMQProtocolHeaderException) - { - - protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - - protocolSession.close(); - - _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable); - } - else if (throwable instanceof IOException) - { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable); - } - else - { - _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); - - - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion()); - ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); - - protocolSession.write(closeBody.generateFrame(0)); - - protocolSession.close(); - } - } - - /** - * Invoked when a message is received on a particular protocol session. Note that a - * protocol session is directly tied to a particular physical connection. - * - * @param protocolSession the protocol session that received the message - * @param message the message itself (i.e. a decoded frame) - * - * @throws Exception if the message cannot be processed - */ - public void messageReceived(IoSession protocolSession, Object message) throws Exception - { - final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); - - if (message instanceof AMQDataBlock) - { - amqProtocolSession.dataBlockReceived((AMQDataBlock) message); - - } - else if (message instanceof ByteBuffer) - { - throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message); - } - else - { - throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); - } - } - - /** - * Called after a message has been sent out on a particular protocol session - * - * @param protocolSession the protocol session (i.e. connection) on which this - * message was sent - * @param object the message (frame) that was encoded and sent - * - * @throws Exception if we want to indicate an error - */ - public void messageSent(IoSession protocolSession, Object object) throws Exception - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Message sent: " + object); - } - } - - protected boolean isSSLClient(ServerConfiguration connectionConfig, - IoSession protocolSession) - { - InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress(); - return addr.getPort() == connectionConfig.getSSLPort(); - } -} |