diff options
Diffstat (limited to 'client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 553 |
1 files changed, 0 insertions, 553 deletions
diff --git a/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java deleted file mode 100644 index eab9084717..0000000000 --- a/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ /dev/null @@ -1,553 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.log4j.Logger; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.qpid.AMQConnectionClosedException; -import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.failover.FailoverHandler; -import org.apache.qpid.client.failover.FailoverState; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.ssl.BogusSSLContextFactory; - -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - -public class AMQProtocolHandler extends IoHandlerAdapter -{ - private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); - - /** - * The connection that this protocol handler is associated with. There is a 1-1 - * mapping between connection instances and protocol handler instances. - */ - private AMQConnection _connection; - - /** - * Used only when determining whether to add the SSL filter or not. This should be made more - * generic in future since we will potentially have many transport layer options - */ - private boolean _useSSL; - - /** - * Our wrapper for a protocol session that provides access to session values - * in a typesafe manner. - */ - private volatile AMQProtocolSession _protocolSession; - - private AMQStateManager _stateManager = new AMQStateManager(); - - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); - - /** - * We create the failover handler when the session is created since it needs a reference to the IoSession in order - * to be able to send errors during failover back to the client application. The session won't be available in the - * case where we failing over due to a Connection.Redirect message from the broker. - */ - private FailoverHandler _failoverHandler; - - /** - * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly - * attempting failover where it is failing. - */ - private FailoverState _failoverState = FailoverState.NOT_STARTED; - - private CountDownLatch _failoverLatch; - - public AMQProtocolHandler(AMQConnection con) - { - _connection = con; - - // We add a proxy for the state manager so that we can substitute the state manager easily in this class. - // We substitute the state manager when performing failover - _frameListeners.add(new AMQMethodListener() - { - public boolean methodReceived(AMQMethodEvent evt) throws AMQException - { - return _stateManager.methodReceived(evt); - } - - public void error(Exception e) - { - _stateManager.error(e); - } - }); - } - - public boolean isUseSSL() - { - return _useSSL; - } - - public void setUseSSL(boolean useSSL) - { - _useSSL = useSSL; - } - - public void sessionCreated(IoSession session) throws Exception - { - _logger.debug("Protocol session created for session " + System.identityHashCode(session)); - _failoverHandler = new FailoverHandler(this, session); - - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); - - if (Boolean.getBoolean("amqj.shared_read_write_pool")) - { - session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - session.getFilterChain().addLast("protocolFilter", pcf); - } - // we only add the SSL filter where we have an SSL connection - if (_useSSL) - { - //FIXME: Bogus context cannot be used in production. - SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory.getInstance(false)); - sslFilter.setUseClientMode(true); - session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); - } - - _protocolSession = new AMQProtocolSession(this, session, _connection); - _protocolSession.init(); - } - - public void sessionOpened(IoSession session) throws Exception - { - System.setProperty("foo", "bar"); - } - - /** - * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by - * sessionClosed() depending on whether we were trying to send data at the time of failure. - * - * @param session - * @throws Exception - */ - public void sessionClosed(IoSession session) throws Exception - { - if (_connection.isClosed()) - { - _logger.info("Session closed called by client"); - } - else - { - _logger.info("Session closed called with failover state currently " + _failoverState); - - //reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) - { - _logger.info("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.info("Not starting failover as state currently " + _failoverState); - } - } - else - { - _logger.info("Failover not allowed by policy."); - - if (_logger.isDebugEnabled()) - { - _logger.debug(_connection.getFailoverPolicy().toString()); - } - - if (_failoverState != FailoverState.IN_PROGRESS) - { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.")); - } - else - { - _logger.info("sessionClose() failover in progress"); - } - } - } - - _logger.info("Protocol Session [" + this + "] closed"); - } - - /** - * See {@link FailoverHandler} to see rationale for separate thread. - */ - private void startFailoverThread() - { - Thread failoverThread = new Thread(_failoverHandler); - failoverThread.setName("Failover"); - // Do not inherit daemon-ness from current thread as this can be a daemon - // thread such as a AnonymousIoService thread. - failoverThread.setDaemon(false); - failoverThread.start(); - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - //write heartbeat frame: - _logger.debug("Sent heartbeat"); - session.write(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - //failover: - HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - session.close(); - } - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - if (_failoverState == FailoverState.NOT_STARTED) - { - //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) - if (cause instanceof AMQConnectionClosedException) - { - _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attemp failover - - sessionClosed(session); - } - } - // we reach this point if failover was attempted and failed therefore we need to let the calling app - // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) - { - _logger.error("Exception caught by protocol handler: " + cause, cause); - // we notify the state manager of the error in case we have any clients waiting on a state - // change. Those "waiters" will be interrupted and can handle the exception - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); - _connection.exceptionReceived(cause); - } - } - - /** - * There are two cases where we have other threads potentially blocking for events to be handled by this - * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a - * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can - * react appropriately. - * - * @param e the exception to propagate - */ - public void propagateExceptionToWaiters(Exception e) - { - _stateManager.error(e); - final Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener ml = (AMQMethodListener) it.next(); - ml.error(e); - } - } - - private static int _messageReceivedCount; - - public void messageReceived(IoSession session, Object message) throws Exception - { - - if (_messageReceivedCount++ % 1000 == 0) - { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - Iterator it = _frameListeners.iterator(); - AMQFrame frame = (AMQFrame) message; - - HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody); - - if (frame.bodyFrame instanceof AMQMethodBody) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Method frame received: " + frame); - } - - final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody) frame.bodyFrame, _protocolSession); - try - { - boolean wasAnyoneInterested = false; - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); - } - } - catch (AMQException e) - { - it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - exceptionCaught(session, e); - } - } - else if (frame.bodyFrame instanceof ContentHeaderBody) - { - _protocolSession.messageContentHeaderReceived(frame.channel, - (ContentHeaderBody) frame.bodyFrame); - } - else if (frame.bodyFrame instanceof ContentBody) - { - _protocolSession.messageContentBodyReceived(frame.channel, - (ContentBody) frame.bodyFrame); - } - else if (frame.bodyFrame instanceof HeartbeatBody) - { - _logger.debug("Received heartbeat"); - } - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); - } - - private static int _messagesOut; - - public void messageSent(IoSession session, Object message) throws Exception - { - if (_messagesOut++ % 1000 == 0) - { - _logger.debug("Sent " + _messagesOut + " protocol messages"); - } - _connection.bytesSent(session.getWrittenBytes()); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sent frame " + message); - } - } - - public void addFrameListener(AMQMethodListener listener) - { - _frameListeners.add(listener); - } - - public void removeFrameListener(AMQMethodListener listener) - { - _frameListeners.remove(listener); - } - - public void attainState(AMQState s) throws AMQException - { - _stateManager.attainState(s); - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - _protocolSession.writeFrame(frame); - } - - public void writeFrame(AMQDataBlock frame, boolean wait) - { - _protocolSession.writeFrame(frame, wait); - } - - /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. - * - * @param frame - * @param listener the blocking listener. Note the calling thread will block. - */ - private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) - throws AMQException - { - try - { - _frameListeners.add(listener); - _protocolSession.writeFrame(frame); - - AMQMethodEvent e = listener.blockForFrame(); - return e; - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener - } - finally - { - // If we don't remove the listener then no-one will - _frameListeners.remove(listener); - } - - } - - /** - * More convenient method to write a frame and wait for it's response. - */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException - { - return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.channel, responseClass)); - } - - /** - * Convenience method to register an AMQSession with the protocol handler. Registering - * a session with the protocol handler will ensure that messages are delivered to the - * consumer(s) on that session. - * - * @param channelId the channel id of the session - * @param session the session instance. - */ - public void addSessionByChannel(int channelId, AMQSession session) - { - _protocolSession.addSessionByChannel(channelId, session); - } - - /** - * Convenience method to deregister an AMQSession with the protocol handler. - * - * @param channelId then channel id of the session - */ - public void removeSessionByChannel(int channelId) - { - _protocolSession.removeSessionByChannel(channelId); - } - - public void closeSession(AMQSession session) throws AMQException - { - _protocolSession.closeSession(session); - } - - public void closeConnection() throws AMQException - { - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - - final AMQFrame frame = ConnectionCloseBody.createAMQFrame( - 0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0); - syncWrite(frame, ConnectionCloseOkBody.class); - - _protocolSession.closeProtocolSession(); - } - - /** - * @return the number of bytes read from this protocol session - */ - public long getReadBytes() - { - return _protocolSession.getIoSession().getReadBytes(); - } - - /** - * @return the number of bytes written to this protocol session - */ - public long getWrittenBytes() - { - return _protocolSession.getIoSession().getWrittenBytes(); - } - - public void failover(String host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - startFailoverThread(); - } - - public void blockUntilNotFailingOver() throws InterruptedException - { - if (_failoverLatch != null) - { - _failoverLatch.await(); - } - } - - public String generateQueueName() - { - return _protocolSession.generateQueueName(); - } - - public CountDownLatch getFailoverLatch() - { - return _failoverLatch; - } - - public void setFailoverLatch(CountDownLatch failoverLatch) - { - _failoverLatch = failoverLatch; - } - - public AMQConnection getConnection() - { - return _connection; - } - - public AMQStateManager getStateManager() - { - return _stateManager; - } - - public void setStateManager(AMQStateManager stateManager) - { - _stateManager = stateManager; - } - - FailoverState getFailoverState() - { - return _failoverState; - } - - public void setFailoverState(FailoverState failoverState) - { - _failoverState = failoverState; - } -} |