/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.client.protocol; import org.apache.qpid.client.HeartbeatListener; import org.apache.qpid.util.BytesDataOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in * terms of "message received" and so on. *
* There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public * API calls through which an individual connection can be manipulated. This protocol handler talks to the network * through MINA, in a behind the scenes role; it is not an exposed part of the client API. *
* There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level, * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions * in the event of failover. See below for more information about this. *
* Mina provides a session container that can be used to store/retrieve arbitrary objects as String named * attributes. A more convenient, type-safe, container for session data is provided in the form of * {@link AMQProtocolSession}. * *
* A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper * as described above). This event handler is different, because dealing with failover complicates things. To the * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection * and the protocol session data is held outside of the MINA IOSession. *
* This handler is responsible for setting up the filter chain to filter all events for this handler through. * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. *
* TODO Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
* failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
* AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
* be merged, although there is sense in keeping the session model separate. Will clarify things by having data
* held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler implements ProtocolEngine
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
/**
* The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
* instances and protocol handler instances.
*/
private AMQConnection _connection;
/** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
private volatile AMQProtocolSession _protocolSession;
/** Holds the state of the protocol session. */
private AMQStateManager _stateManager;
/** Holds the method listeners, */
private final CopyOnWriteArraySet
* TODO Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
public void closed()
{
if (_connection.isClosed())
{
_logger.debug("Session closed called by client");
}
else
{
// Use local variable to keep flag whether fail-over allowed or not,
// in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
// otherwise it might deadlock with failover mutex
boolean failoverNotAllowed = false;
synchronized (this)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Session closed called with failover state " + _failoverState);
}
// reconnetablility was introduced here so as not to disturb the client as they have made their intentions
// known through the policy settings.
if (_failoverState == FailoverState.NOT_STARTED)
{
// close the sender
try
{
_sender.close();
}
catch (Exception e)
{
_logger.warn("Exception occured on closing the sender", e);
}
if (_connection.failoverAllowed())
{
_failoverState = FailoverState.IN_PROGRESS;
_logger.debug("FAILOVER STARTING");
startFailoverThread();
}
else if (_connection.isConnected())
{
failoverNotAllowed = true;
if (_logger.isDebugEnabled())
{
_logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy());
}
}
else
{
_logger.debug("We are in process of establishing the initial connection");
}
}
else
{
_logger.debug("Not starting the failover thread as state currently " + _failoverState);
}
}
if (failoverNotAllowed)
{
_connection.exceptionReceived(new AMQDisconnectedException(
"Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
}
}
if (_logger.isDebugEnabled())
{
_logger.debug("Protocol Session [" + this + "] closed");
}
}
/** See {@link FailoverHandler} to see rationale for separate thread. */
private void startFailoverThread()
{
if(!_connection.isClosed())
{
final Thread failoverThread;
try
{
failoverThread = Threading.getThreadFactory().createThread(_failoverHandler);
}
catch (Exception e)
{
throw new RuntimeException("Failed to create thread", e);
}
failoverThread.setName("Failover");
// Do not inherit daemon-ness from current thread as this can be a daemon
// thread such as a AnonymousIoService thread.
failoverThread.setDaemon(false);
failoverThread.start();
}
}
public void readerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
public void writerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: writer");
writeFrame(HeartbeatBody.FRAME);
_heartbeatListener.heartbeatSent();
}
/**
* Invoked when any exception is thrown by the NetworkDriver
*/
public void exception(Throwable cause)
{
boolean causeIsAConnectionProblem =
cause instanceof AMQConnectionClosedException ||
cause instanceof IOException ||
cause instanceof TransportException;
if (causeIsAConnectionProblem)
{
//ensure the IoSender and IoReceiver are closed
try
{
_network.close();
}
catch (Exception e)
{
//ignore
}
}
FailoverState state = getFailoverState();
if (state == FailoverState.NOT_STARTED)
{
if (causeIsAConnectionProblem)
{
_logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
}
else
{
_connection.exceptionReceived(cause);
}
// FIXME Need to correctly handle other exceptions. Things like ...
// AMQChannelClosedException
// which will cause the JMSSession to end due to a channel close and so that Session needs
// to be removed from the map so we can correctly still call close without an exception when trying to close
// the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception
}
// we reach this point if failover was attempted and failed therefore we need to let the calling app
// know since we cannot recover the situation
else if (state == FailoverState.FAILED)
{
_logger.error("Exception caught by protocol handler: " + cause, cause);
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
else
{
_logger.warn("Exception caught by protocol handler: " + cause, cause);
}
}
/**
* There are two cases where we have other threads potentially blocking for events to be handled by this class.
* These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
* of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
* This should be called only when the exception is fatal for the connection.
*
* @param e the exception to propagate
*
* @see #propagateExceptionToFrameListeners
*/
public void propagateExceptionToAllWaiters(Exception e)
{
getStateManager().error(e);
propagateExceptionToFrameListeners(e);
}
/**
* This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any
* protocol level waits.
*
* This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
* stop waiting and relinquish the Failover lock. See {@link FailoverHandler}.
*
* Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
* their protocol request and so listen again for the correct frame.
*
* @param e the exception to propagate
*/
public void propagateExceptionToFrameListeners(Exception e)
{
synchronized (_frameListeners)
{
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
final AMQMethodListener ml = (AMQMethodListener) it.next();
ml.error(e);
}
}
}
}
public void notifyFailoverStarting()
{
// Set the last exception in the sync block to ensure the ordering with add.
// either this gets done and the add does the ml.error
// or the add completes first and the iterator below will do ml.error
synchronized (_frameListeners)
{
_lastFailoverException = new FailoverException("Failing over about to start");
}
//Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
// interrupted unless failover cannot restore the state.
propagateExceptionToFrameListeners(_lastFailoverException);
}
public void failoverInProgress()
{
_lastFailoverException = null;
}
private static int _messageReceivedCount;
public void received(ByteBuffer msg)
{
_readBytes += msg.remaining();
_lastReadTime = System.currentTimeMillis();
try
{
final ArrayList
* If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
* anyway.
*
* @param timeout The timeout to wait for an acknowledgment to the close request.
*
* @throws AMQException If the close fails for any reason.
*/
public void closeConnection(long timeout) throws AMQException
{
if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
// Connection is already closed then don't do a syncWrite
try
{
final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
new AMQShortString("JMS client is closing the connection."), 0, 0);
final AMQFrame frame = body.generateFrame(0);
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
_network.close();
closed();
}
catch (AMQTimeoutException e)
{
closed();
}
catch (FailoverException e)
{
_logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
}
}
}
/** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
return _readBytes;
}
/** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
return _writtenBytes;
}
public void failover(String host, int port)
{
_failoverHandler.setHost(host);
_failoverHandler.setPort(port);
// see javadoc for FailoverHandler to see rationale for separate thread
startFailoverThread();
}
public void blockUntilNotFailingOver() throws InterruptedException
{
synchronized(_failoverLatchChange)
{
if (_failoverLatch != null)
{
if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
{
}
}
}
}
public AMQShortString generateQueueName()
{
return _protocolSession.generateQueueName();
}
public CountDownLatch getFailoverLatch()
{
return _failoverLatch;
}
public void setFailoverLatch(CountDownLatch failoverLatch)
{
synchronized (_failoverLatchChange)
{
_failoverLatch = failoverLatch;
}
}
public AMQConnection getConnection()
{
return _connection;
}
public AMQStateManager getStateManager()
{
return _stateManager;
}
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
_stateManager.setProtocolSession(_protocolSession);
}
public AMQProtocolSession getProtocolSession()
{
return _protocolSession;
}
synchronized FailoverState getFailoverState()
{
return _failoverState;
}
public synchronized void setFailoverState(FailoverState failoverState)
{
_failoverState= failoverState;
}
public byte getProtocolMajorVersion()
{
return _protocolSession.getProtocolMajorVersion();
}
public byte getProtocolMinorVersion()
{
return _protocolSession.getProtocolMinorVersion();
}
public MethodRegistry getMethodRegistry()
{
return _protocolSession.getMethodRegistry();
}
public ProtocolVersion getProtocolVersion()
{
return _protocolSession.getProtocolVersion();
}
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
return _network.getLocalAddress();
}
public void setNetworkConnection(NetworkConnection network)
{
setNetworkConnection(network, network.getSender());
}
public void setNetworkConnection(NetworkConnection network, Sender