summaryrefslogtreecommitdiff
path: root/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java')
-rw-r--r--broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java230
1 files changed, 0 insertions, 230 deletions
diff --git a/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
deleted file mode 100644
index 18980f440b..0000000000
--- a/broker/src/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.transport.ConnectorConfiguration;
-import org.apache.qpid.ssl.BogusSSLContextFactory;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.util.SessionUtil;
-
-import java.io.IOException;
-
-
-/**
- * The protocol handler handles "protocol events" for all connections. The state
- * associated with an individual connection is accessed through the protocol session.
- *
- * We delegate all frame (message) processing to the AMQProtocolSession which wraps
- * the state for the connection.
- *
- */
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList
-{
- private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
-
- /**
- * The registry of all queues. This is passed to frame listeners when frame
- * events occur.
- */
- private final QueueRegistry _queueRegistry;
-
- /**
- * The registry of all exchanges. This is passed to frame listeners when frame
- * events occur.
- */
- private final ExchangeRegistry _exchangeRegistry;
-
- private boolean _useSSL;
-
- public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
- {
- IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance);
-
- _queueRegistry = registry.getQueueRegistry();
- _exchangeRegistry = registry.getExchangeRegistry();
- _logger.debug("AMQPFastProtocolHandler created");
- }
-
- public AMQPFastProtocolHandler(QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry)
- {
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
-
- _logger.debug("AMQPFastProtocolHandler created");
- }
-
- protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
- {
- this(handler._queueRegistry, handler._exchangeRegistry);
- }
-
- public void sessionCreated(IoSession protocolSession) throws Exception
- {
- SessionUtil.initialize(protocolSession);
- final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-
- createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory);
- _logger.info("Protocol session created");
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
-
- ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
- getConfiguredObject(ConnectorConfiguration.class);
- if (connectorConfig.enableExecutorPool)
- {
- if (_useSSL)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(BogusSSLContextFactory.getInstance(true)));
- }
- protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- protocolSession.getFilterChain().addLast("protocolFilter", pcf);
- }
- }
-
- /**
- * Separated into its own, protected, method to allow easier reuse
- */
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
- {
- new AMQMinaProtocolSession(session, queues, exchanges, codec);
- }
-
- public void sessionOpened(IoSession protocolSession) throws Exception
- {
- _logger.info("Session opened");
- }
-
- public void sessionClosed(IoSession protocolSession) throws Exception
- {
- _logger.info("Protocol Session closed");
- final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- amqProtocolSession.closeSession();
- }
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- _logger.debug("Protocol Session [" + this + "] idle: " + status);
- if(IdleStatus.WRITER_IDLE.equals(status))
- {
- //write heartbeat frame:
- session.write(HeartbeatBody.FRAME);
- }
- else if(IdleStatus.READER_IDLE.equals(status))
- {
- //failover:
- throw new IOException("Timed out while waiting for heartbeat from peer.");
- }
-
- }
-
- public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
- {
- AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- if (throwable instanceof AMQProtocolHeaderException)
- {
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be returned
- here. */
- int i = pv.length - 1;
- protocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
- protocolSession.close();
- _logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable);
- }
- else if(throwable instanceof IOException)
- {
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
- }
- else
- {
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0));
- _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
- protocolSession.close();
- }
- }
-
- /**
- * Invoked when a message is received on a particular protocol session. Note that a
- * protocol session is directly tied to a particular physical connection.
- * @param protocolSession the protocol session that received the message
- * @param message the message itself (i.e. a decoded frame)
- * @throws Exception if the message cannot be processed
- */
- public void messageReceived(IoSession protocolSession, Object message) throws Exception
- {
- final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
-
- if (message instanceof AMQDataBlock)
- {
- amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
- }
- else if (message instanceof ByteBuffer)
- {
- throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message);
- }
- else
- {
- throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
- }
- }
-
- /**
- * Called after a message has been sent out on a particular protocol session
- * @param protocolSession the protocol session (i.e. connection) on which this
- * message was sent
- * @param object the message (frame) that was encoded and sent
- * @throws Exception if we want to indicate an error
- */
- public void messageSent(IoSession protocolSession, Object object) throws Exception
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Message sent: " + object);
- }
- }
-
- public boolean isUseSSL()
- {
- return _useSSL;
- }
-
- public void setUseSSL(boolean useSSL)
- {
- _useSSL = useSSL;
- }
-}